在流计算中,事件会持续不断的产生,如果每次计算都是相互独立的,不依赖于上下游事件,则称为无状态 (stateless) 计算;如果每次的计算依赖于之前或者后续的事件,则称为有状态 (stateful) 计算。Flink 中的状态用 State 抽象,用来保存中间计算结果或者缓存数据,State 是 Flink 内部实现 Exactly-Once的基础。
状态类型
和 redis 类似,Flink 按照数据类型的不同,定义了多种 State 接口,具体如下:
-
ValueState<T>
单值状态,与数据的 key 绑定;提供了 update(T value) 方法更新值,value() 方法获取状态值。
-
ListState<T>
Key 上的状态值为一个 List;提供了 add ,get 方法来分别增加和获取数据。
-
MapState<UK, UV>
Key 上的状态值为一个 Map;提供了 put,putAll, get 方法来增加和获取数据。
-
ReducingState<T>
实现的 ReduceFunction 中使用的 state, 在 reduce 方法之前,会先调用 ReducingState 的 add 方法,reduce 方法中的第一个参数就是状态值。
-
AggregatingState<IN, OUT>
聚合 state; 在 AggregateFunction 的 add 方法调用之前,会先调用 AggregatingState 的 add 方法,传入 acc。
按照算子是否有 key, Flink State 又被划分为 KeyedState 和 OperatorState。
类型 | state |
---|---|
KeyedState | ValueState<br/ >ListState<br />ReducingState<br />AggregationState<br />MapState<br /> |
OperatorState | ListState |
今天我们重点讲 KeyedState 里的最简单的 ValueState 的实现。KeyedState,顾名思义,要与 key 绑定,即只能用在 KeyedStream 流之后;每一个 key 对应一个 State 值;Key 除了有分区的作用,在状态管理当中,它还用于计算 keyGroupIndex:
// AbstractStreamOperator.java line:463
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
}
}
// StreamOperatorStateHandler.java line:281
public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
@SuppressWarnings("unchecked,rawtypes")
AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
rawBackend.setCurrentKey(key);
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting the current key context.", e);
}
}
}
// 接着看 AbstractKeyedStateBackend.java line:172
public void setCurrentKey(K newKey) {
notifyKeySelected(newKey);
this.keyContext.setCurrentKey(newKey); this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey,numberOfKeyGroups));
}
从上面可以看出, 先设置 key, 再设置 keyGroupIndex,具体 keyGroupIndex 的作用是什么,我们后面会讲。
状态描述
State 是暴露给用户的接口,那么就需要指定状态的一些属性,如 name, type, ttl 等。Flink 中用 StateDescriptor 来描述一个状态,在对应的 StateBackend (状态后端) 中,调用 create 方法得到对应的 State 对象。下面以一个简单的 demo 来演示。
private transient ValueState<Tuple2<Integer, Integer>> state;
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));
state = getRuntimeContext().getState(descriptor);
}
我们顺着方法调用链走下去,跳过中间一些无关的代码,直接看 TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法:
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateBackend<K> stateBackend,
TtlTimeProvider timeProvider) throws Exception {
...
// 是否为状态设置了 ttl,如果设置 ttl,使用TtlStateFactory创建state,如果没有,直接调用 stateBackend创建state
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
namespaceSerializer, stateDesc, stateBackend, timeProvider)
.createState() :
stateBackend.createInternalState(namespaceSerializer, stateDesc);
}
上面两个创建 state 的不同分支区别就是是否设置了 ttl,其它的基本一样。接着看 stateBackend.createInternalState 方法的实现
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
// 这里是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
// 根据状态描述的类型,调用对应的构造方法。
return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}
StateTable 是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现。
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<?, V> stateDesc,
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {
@SuppressWarnings("unchecked")
StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());
TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
if (stateTable != null) {
// ...
} else {
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
namespaceSerializer,
newStateSerializer,
snapshotTransformFactory);
// 得到 statetable对象。启用 statetable的构造方法。初始化size为128(默认最大并行度)的 StateMap数组
stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
registeredKVStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}
StateTable 里提供的 get, put 操作实现了对状态值的 获取和更新,StateTable是一个抽象类,如下:
NestedMapsStateTable 使用两层嵌套的 HashMap 保存状态数据,支持同步快照;CopyOnWriteStateTable 使用 CopyOnWriteStateMap 来保存状态数据,支持异步快照。下面是 CopyOnWriteStateTable里的 put 和 get 方法的实现
private S get(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
// stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
// StateMap内部封装了MapEntry,类似于 HashMap,基于链表加数组的实现
StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
if (stateMap == null) {
return null;
}
// 从map中得到state value
return stateMap.get(key, namespace);
}
public void put(K key, int keyGroup, N namespace, S state) {
checkKeyNamespacePreconditions(key, namespace);
// stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
stateMap.put(key, namespace, state);
}
上面代码比较简单明了,也就不用多做解释。
总结
- StateDescribe 持有状态的描述, StateBackend 通过它来创建 State 对象
- State 对象里封装了 StateTable,StateTable 负责对State 做 snapshot 到对应的 StateBackend。
- StateTable 里封装了 StateMap,为存储 state 的内存介质,负责状态的更新/新增/获取 (基于内存)。
- StateMap 在 StateTable 中为一个数组,默认 size 为最大并行度 128,所以会存在不同 key 对应一个StateMap对象。里面的实现类似于 HashMap,为链表加数组的数据结构。