有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:
当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;
当每分钟聚合事件时,状态会保存挂起的聚合
当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数
为了使state容错,Flink需要识别state并checkpoint它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。
这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。
键控State 和算子 State(Keyed State and Operator State)
在Flink中有两个基本的state:Keyed state和 Operator state
Keyed State
Keyed State
总是与key相关,并且只能应用于KeyedStream
的函数和操作中。
你可以认为Keyed State
是一个已经分区或者划分的,每个state分区对应一个key的Operator State
, 每个keyed-state
逻辑上与一个<并行操作实例, 键>(<parallel-operator-instance, key>
)绑定在一起,由于每个key属于唯一一个键控算子(keyed operator
)的并行实例,我们可以简单地看作是<operator, key>
。
Keyed State
可以进一步的组成Key Group
, Key Group
是Flink重新分配Keyed State
的最小单元,这里有跟定义的最大并行数一样多的Key Group
,在运行时keyed operator
的并行实例与key一起为一个或者多个Key Group
工作。
Operator State
使用Operator State
(或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。Kafka Connector
就是在Flink中使用Operator State
的一个很好的例子,每个Kafka consumer
的并行实例保存着一个topic
分区和偏移量的map作为它的Operator State
。
当并行数发生变化时,Operator State
接口支持在并行操作实例中进行重新分配,这里有多种方法来进行重分配。
原生的和托管的State(Raw and Managed State)
Keyed State和 Operator State存在两种形式:托管的和原生的。
托管的State(Managed State
)由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。
原生State(Raw State
)是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。
所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管理内存。
使用托管的键控state(Using Managed Keyed State)
托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。
现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:
ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用update(T)
来更新,使用T value()
来获取。
ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的Iterable
,元素可以通过调用add(T)
来添加,而Iterable
可以调用Iterable<T> get()
来获取。
ReducingState<T>:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReduceFunction聚合成一个聚合值。
FoldingState<T, ACC>:这将保存表示添加到状态的所有值的聚合的单个值,与ReducingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。
MapState<T>:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用put(UK, UV)
或者putAll(Map<UK, UV>)
来添加。与key相关的value,可以使用get(UK)
来获取,映射的迭代、keys及values可以分别调用entries()
, keys()
和values()
来获取。
所有类型的state都有一个clear()
方法来清除当前活动的key(及输入元素的key)的State。
注意:FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。
值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。
为了获得一个State句柄,你需要创建一个StateDescriptor
,这个StateDescriptor
保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个ReduceFunction
。根据你想要检索的state的类型,你可以创建一个ValueStateDescriptor
, 一个ListStateDescriptor
, 一个ReducingStateDescriptor
, 一个FoldingStateDescriptor
或者一个MapStateDescriptor
State可以通过RuntimeContext
来访问,所以只能在富函数中使用。RichFunction
中的RuntimeContext
有以下这些方法来访问state:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReduceingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
这个FlatMapFunction
例子展示了所有部件如何组合在一起:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。
Scala DataStream API 中的 State
除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的map()
或flatMap()
函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
使用托管算子State
为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
CheckpointedFunction
CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
无论何时执行checkpoint,snapshotState()
都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的initializeState()
,当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此,initializeState()
不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。
目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:
Even-split redistribution:每个算子返回一个State元素列表,
Union redistribution:每个算子返回一个State元素列表,