一、有状态计算
在flink的结构体系当中,有状态的计算可以说是flink非常重要的特性之一了。有状态的计算是指在程序计算过程中,在flink程序内部存储着计算产生的中间结果,然后可以提供给后续计算算子使用。其实这一点也非常好理解,流计算相对于离线计算,对未来会到来什么数据永远不可知,那么想要实时的更新计算结果,只能先把已经接收到的数据,计算完的结果状态进行保存,等待后续数据到来,与之前的状态数据进行整合运算。
在flink当中,状态数据可以是job manager的jvm堆内存或者是对外内存,也可以是本地的文件系统,也可以借助第三方的存储介质,例如Flink已经实现的RocksDB。
如上图所示,managed state是状态管理器,不同形式的状态数据存储,就对应着不同的状态管理器,这个后续会想说,Local State Backed是状态后端,用来泛指状态数据存储的位置,状态数据是由状态管理器来进行管理,计算算子会跟managed state进行交互信息,如图中的write和read,通过这样来实现有状态的计算。
二、flink状态类型及应用
2.1、Keyed State
在flink中根据数据集是否根据Key进行分区,将状态分为Keyed State和Operator State两种类型。Key State明显就是表示跟Key相关的一种State,只能用于KeyedStream类型数据集对应的Functions和Operation之上。相比较于Operator State,Keyed State事先按照Key对数据集进行了分区,每个key State仅对应一个Operator和Key的组合。当算子并行度发生变化时,自动重新分布Key State数据。
2.2、Operator State
与Key State不同的是,Operator State和并行的算子实例进行绑定,和数据元素中的Key无关,每个算子实例(task)持有所有数据元素中的一部分状态数据,Operator State支持当算子实例并行度发生变化时重新分配状态数据。
2.3、Keyed State和Operator State的存在形式
在flink当中,Keyed State和Operator State均具有两种形式,其中一种为托管状态(Manager State)形式,由Flink Runtime中控制和管理状态,并将状态数据转换成为内存Hash tables或者RocksDB的对象进行存储,并且将这些状态数据通过接口转换成checkpoint当中,以便于在flink发生故障的时候,能够从checkpoint当中进行恢复。
另外一种是原生状态(Raw State)形式,由算子自己管理数据结构,当触发checkpoint的时候,flink其实并不知道当前的数据状态,只是把数据状态转换成bytes存储到checkpoint当中,当从checkpoint进行故障恢复的时候,也需要自己在反序列化出状态的数据结构。
三、Checkpoint和Savepoint
3.1、Checkpoint检查点机制
Flink基于异步轻量级的分布式快照技术提供了Checkpoint容错机制,分布式快照可以把同一时间的Task/Operator的状态数据全局统一进行处理,包括前面提到的Keyed State和Operator State。
如图中所示,Flink会在输入的数据集当中间隔性地生成CheckPoint barrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoin当中,当flink程序发生故障时,也能够从最新的checkpoint当中恢复,从而保证数据的一致性。一致性问题涉及到端到端,从source-》transform-》sink来讲,transform不用多说,flink自己维护着checkpoint,自然是能够保证一致性的,关键在于source和sink,source拿kafka来说,在进行checkpoint的时候,会将当前消费kakfa的offset也保存在状态当中,这样当flink程序故障以后,进行恢复的时候,会从上一次checkpoint当中保存的offset来进行消费,从而避免了重复消费问题,sink是将数据持久化第三方的存储,由于第三方的多元性,这里就不一一介绍,一般都是通过联动事务性来处理的。
flink默认是不开启检查点的,用户需要在程序中通过调用enable-CheckPoint(n)的方法配置来开启检查点,n为检查点执行的时间间隔,单位是毫秒,除了配置时间间隔,针对检查点还可以调整其他相关参数:
tip:由于版本不同,具体的参数配置还需参考使用flink版本
(1)Checkpoint开启和是时间间隔指定
开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值。
env.enableCheckpointing(1000);
(2)exactly-once和at-least-once语义选择
exactly-once可以满足端到端的数据一致性问题,这种情况比较适合于对数据准确度要求比较高的场景,不允许出现重复数据和丢失数据,当然,flink的性能也会相对来讲差一些,还有一种就是at-least-once,至少一次的方式,这种形式可以会出现数据重复,适用于对吞吐量大,对性能要求高的场景,默认情况下是使用exactly-once。
env.getCheckpointConfig().setCheckpointMode(CheckpointintMode.EXACTLY_ONCE);
(3)Checkpoint超时时间
超时时间是指执行Checkpoint过程所用时间,一旦执行Checkpoint过程超过用户设定的阈值,就会按照超时处理。默认为10分钟。
env.getCheckpointConfig().setCheckpointTimeout(60000);
(4)检查点之间最小时间间隔
设置检查点之间的最小时间间隔,是为了防止Checkpoint过程时间过长,导致两次Checkpoint之间的时间间隔过短,从而导致Checkpoint积压过多,这样会占用大量计算资源而影响到整个应用的性能。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
(5)最大并行执行的检查点数据
最大并行执行的检查点数据用于设置最大同时执行的Checkpoint数量,在默认情况下只有一个检查点可以运行。
env.getCheckpintConfig().setMaxConcurrentCheckpoints(1);
(6)外部检查点
设定周期性的外部检查点,然后将状态数据持久化到外部系统中,使用这种方式不会再任务正常停止的过程中清理掉检查点数据,而且会一直保存在外部系统介质中,另外也可以通过外部介质来进行故障恢复。
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
(7)failOnCheckpointingErrors
failOnCheckpointingErrors参数决定了当Checkpoint执行过程中出现失败或者错误时,任务是否同时被关闭,默认值是True。
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
3.2、Savepoint机制
Savepoint其实是检查点的一种特殊实现,底层还是使用Savepoint的机制,Savepoint是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免在用户在升级和维护集群过程中,丢失了算子的状态数据,从而无法保证端到端的 exactly-once语义保证。
Operator ID往往会和Savepoint机制配合着使用,当Savepoint对整个集群进行升级或运维操作的时候,需要停止整个Flink应用程序,此时用户可能会对应用的代码逻辑进行修改,即使flink能够通过Savepoint将应用中的状态数据同步到磁盘然后进行恢复任务,很可能由于代码逻辑发生了变化,在升级过程中有可能导致算子的状态无法通过Savepoint中的数据进行数据恢复的情况。在这种情况下需要通过唯一的ID标记算子,这样子在进行Savepoint恢复的时候,可以通过使用Operator ID,将算子唯一区分出来。
四、状态管理器
4.1、MemoryStateBackend
之前有提到过flink将状态数据需要进行保存和管理,不同保存方式就对应着不同的状态管理器。基于内存的状态管理器将状态数据全部存储在JVM堆内存当中,基于内存的特点当然就是非常高速和高效,但是也有缺点,每台执行机的内存都是有限的,当保存的状态数据过大的话,很可能会导致内存溢出,并且当执行机出现问题时,整个主机内存中的状态数据都会丢失。需要配合Checkpoint使用。
4.2、FsStateBackend
和MemoryStateBackend不同的是,FsStateBackend是一种基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以HDFS分布式文件系统。
new FsStateBackend(path,false);
path指的是路径,第二个参数是Boolean类型的参数,指定是否以同步的方式进行状态数据记录,默认采用异步的方式将状态数据同步到文件系统中,异步的方式能够尽量避免在Checkpoint的过程中影响到流计算任务。异步的方式存在状态数据丢失的情况,这一点也很容易理解,因为flink没有等返回写入成功就继续执行后续的计算了。将第二个参数设置为True,表示的就是同步了,同步相当于透写,状态数据被写入成功以后返回写成功,才会继续执行后续计算。
FsStateBackend比较适用于状态数据非常的大,不适合在内存当中保存的场景,并且借助第三方的文件系统,例如HDFS,其本身对数据的保存就有容错性,进一步的提升了状态数据的可靠性。但是使用FsStateBackend这个管理器,性能就不如MemoryStateBackend了。
4.2、RocksDBStateBackend
RocksDBStateBackend是Flink中内置的第三方状态管理器。RocksDBStateBackend采用异步的方式对状态数据进行快照,状态数据首先被写进RockDB当中,然后在异步写入文件系统当中,这样在RockDB当中仅会存储正在进行计算的热数据,对于长时间才更新的数据会被写入磁盘当中进行存储。
RocksDBStateBackend的性能介于MemoryStateBackend和FsStateBackend之间。