♣ state和checkpoint 关系:
1、state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】
2、而checkpoint是把state数据持久化存储了,表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态,保存在hdfs
♣ state 分类
1、Keyed State:基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。
保存state的数据结构
ValueState<T>:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
ListState<T>:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值
ReducingState<T>:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
2、Operator State: Key无关的State,与Operator绑定的state,整个operator只对应一个state
保存state的数据结构 ListState<T>,代表:Kafka Connector
♣ checkpoint 配置
checkpoint的checkPointMode有两种,Exactly-once和At-least-once
Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)
♣ State Backend(状态的后端存储)
state会保存在taskmanager的内存中,
checkpoint会存储在JobManager的内存中。
state 的store和checkpoint的位置取决于State Backend的配置
env.setStateBackend(…) ,有3种
1 MemoryStateBackend
2 FsStateBackend
3 RocksDBStateBackend
♣ 修改State Backend的两种方式
第一种: 修改当前任务代码
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
第二种:全局调整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
● 多个Checkpoint及从checkpoint恢复
在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数
state.checkpoints.num-retained: 20
退回指定的checkpoint
flink run -s \
hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata \
flink-job.jar
● Savepoint
checkPoint :应用定时触发,用于保存状态,会过期
内部应用失败重启的时候使用
savePoint:用户手动执行,是指向Checkpoint的指针,不会过期在升级的情况下使用
注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID
savepoint使用
1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:触发一个savepoint【直接触发或者在cancel的时候触发】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]
。