本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/GuA9o09EEue66fEpGgoGaQ
本文是 Flink 源码解析系列,通过阅读本文你能 get 到以下点:
- StreamTask 类的基本功能及其职责
- StreamTask 初始化详细流程
- StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系
这里先留一个思考题:如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?
dataStream.keyby(_._1)
.map(new MyMapFunction())
.filter(new MyFilterFunction())
.flatMap(new MyFlatMapFunction())
一、 StreamTask 介绍
Flink 中数据的整个处理流程都是围绕 StreamTask 来做的,所以先介绍一下 StreamTask 这个类。StreamTask 类的 doc 如下所示:
注释的大概意思是:StreamTask 是所有 Streaming Task 的基类,是由 TM 部署并执行的本地处理单元。每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。常见的 case:map、flatmap、filter 三个算子连续的算子。
通俗的讲,StreamTask 就是对应一个 subtask 实例。如下图 Job 的 ExecutionGraph 所示,Source 算子和 map 算子 Chain 在一起,组成一个 OperatorChain,所以这两个算子运行在一个 subtask 里,同时这两个算子的并行度为 2,所以在对应两个 subtask。图中后续的算子也是类似,图中任务如果运行起来,就会对应 5 个 subtask,也就是对应 5 个 StreamTask。
从资源角度讲,每个 TaskManager 内部有多个 slot,每个 slot 内部运行着一个 subtask,也就是说每个 slot 内部运行着一个 StreamTask。
看完这个案例,再回顾一遍源码中注释,应该比较容易理解了:
- StreamTask 是由 TM 部署并执行的本地处理单元
- 每个 StreamTask 运行一个或多个 Chain 在一块的 StreamOperator,即:Source 算子和 map 算子就是 Chain 在一起的 Operator
- 这些 Operator 将会在一个线程内同步的执行。即:线程中 Source 算子和 map 算子不能同时执行。
二、StreamTask 职责简介
如源码注释所示,StreamTask 的生命周期如下所示:
简单概括分为三个阶段:初始化、run、close。
初始化阶段包括:Operator 的配置、task 特定的初始化、初始化算子的 State、open-operators。
做 Flink 开发的同学应该都知道:自定义一个 Function 时可以实现 RichFunction,实现 open 方法,然后 Job 启动时,就会调用 open 方法做一些初始化操作。
open-operators 指的是 StreamTask 在初始化阶段,会调用所有实现了 RichFunction 算子 的 open 方法。
run 阶段:主要就是数据处理了。
close 阶段:做一些关闭操作,例如调用算子的 close 方法等,并做一些清理工作。
三、StreamTask 的初始化
StreamTask 的整个流程都在 invoke 方法中,直接从 invoke 方法开始分析。invoke 方法就是上面介绍的三个阶段:初始化、run、close。初始化阶段做了很多事情,有些直接略过了(例如:创建线程池等),当然初始化阶段重要的操作会深入分析。
invoke 中初始化相关的代码做了部分删减,如下所示:
asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
// 创建 StateBackend, 优先从 app 的设置中去加载,再去 config 中去加载,
// 都没有配置,则创建默认的 MemoryStateBackend
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
// task 特定的初始化,例如 当前 StreamTask 有 input 的情况下,会初始化 inputProcessor
init();
synchronized (lock) {
// 循环遍历,对该 task 所有 Operator 进行状态初始化,
// 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
initializeState();
openAllOperators();
}
初始化部分代码较多,下面主要介绍几部分:
- createStateBackend
- init
- initializeState();
- openAllOperators();
1. createStateBackend
见名之意,该方法就是创建 StateBackend,Flink 目前支持三种 StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。该方法决定了当前 Job 具体要创建哪种 StateBackend。
源码如下:
// (1) the application defined state backend has precedence
// 代码中创建了 StateBackend,则按照代码中配置来
if (fromApplication != null) {
// see if this is supposed to pick up additional configuration parameters
if (fromApplication instanceof ConfigurableStateBackend) {
// needs to pick up configuration
if (logger != null) {
logger.info("Configuring application-defined state backend with job/cluster config");
}
backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
}
else {
// keep as is!
backend = fromApplication;
}
}
else {
// (2) check if the config defines a state backend
// 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
}
else {
// (3) use the default
// 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
}
}
整体流程比较简单:
- 代码中创建了 StateBackend,则按照代码中配置来
- 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
- 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend
2. init
init 运行 task 特定的初始化,例如当前 StreamTask 有 input 的情况下,会初始化 inputProcessor 读取并处理数据。关于 inputProcessor 会在 run 部分重点介绍,这里先略过。
3. initializeState
initializeState 方法源码:
// 循环遍历,对该 task 所有 Operator 进行状态初始化,
// 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
private void initializeState() throws Exception {
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (StreamOperator<?> operator : allOperators) {
if (null != operator) {
operator.initializeState();
}
}
}
源码逻辑比较简单:直接调用当前 StreamTask 的 operatorChain 中所有 StreamOperator 的 initializeState 方法。假如当前 operatorChain 包含了 MapFunction、FilterFunction,两个算子将会被封装在 StreamMap 和 StreamFilter 中,那么此时就会调用这两个算子所对应的 StreamOperator 的 initializeState 方法,根据继承,最后调用的是 AbstractStreamOperator 的无参 initializeState() 方法。
这里专门强调无参 initializeState() 方法,是因为 AbstractStreamOperator 中还有一个有参的 initializeState(StateInitializationContext context) 方法,不要混淆。
注:有参的 initializeState 方法参数类型较长,下文将缩写为 initializeState(context) ;无参的 initializeState 方法继续用 initializeState() 表示。
AbstractStreamOperator 类的 initializeState() 方法介绍
initializeState 方法的简洁版源码如下:
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
// 创建 StreamTaskStateInitializerImpl
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
// 使用 StreamTaskStateInitializerImpl 初始化
// 各种 operatorStateBackend 和 keyedStateBackend,
// 并从 Checkpoint 处恢复 State
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(XXX);
try {
// new Context
StateInitializationContext initializationContext = new
StateInitializationContextImpl(XXX);
/**
* 重点关注 AbstractUdfStreamOperator,它重写了 initializeState(context) 方法,
* 去真正调用 各个 udf 的 initializeState 方法,
*/
initializeState(initializationContext);
} finally {
XXX
}
}
initializeState 方法主要完成两个工作:
- 1、初始化 KeyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State
- 2、如果封装了 udf,则调用 udf 的 initializeState 方法(前提是 userFunction 实现了 CheckpointedFunction 接口)
源码流程:创建 StreamTaskStateInitializer。StreamTaskStateInitializer 只有一个实现类:StreamTaskStateInitializerImpl,所以会创建 StreamTaskStateInitializerImpl。创建时,将之前初始化好的 StateBackend 传递给 StreamTaskStateInitializerImpl,然后调用 streamOperatorStateContext 方法初始化 KeyedStateBackend 和 OperatorStateBackend。
下面重点关注 StreamTaskStateInitializerImpl 类的 streamOperatorStateContext 方法,源码如下所示:
public StreamOperatorStateContext streamOperatorStateContext(XXX){
// -------------- 初始化 Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(XXX);
// -------------- 初始化 Operator State Backend --------------
operatorStateBackend = operatorStateBackend(XXX);
// -------------- Raw State 相关操作 --------------
rawKeyedStateInputs = rawKeyedStateInputs(XXX);
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(XXX);
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
}
首先会初始化 Keyed State Backend 和 Operator State Backend,Flink 还支持 Raw 类型的 State,基本用不到,除非 Flink 内的 Managed State 不能满足作业的需求。重点关注 Keyed State Backend 和 Operator State Backend 的初始化。keyedStatedBackend 方法用于初始化 keyedStatedBackend,operatorStateBackend 方法用于初始化 operatorStateBackend。
StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系
介绍初始化源码之前,先介绍一下 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系。我们都知道 Flink 中支持两种类型的 State,即:KeyedState 和 OperatorState;Flink 目前支持三种状态后端存储,即:Memory、Fs 和 RocksDB。所以 StateBackend 有三种实现即:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。每种 StateBackend 都要支持 KeyedState 和 OperatorState,所以每种 StateBackend 要负责创建出自己相应的 keyedStateBackend 以及 operatorStateBackend。具体 KeyedState 与存储系统如何交互是由 keyedStateBackend 完成的,具体 OperatorState 与存储系统如何交互是由 operatorStateBackend。例如 RocksDBStateBackend 会创建出 RocksDBKeyedStateBackend,每个 RocksDBKeyedStateBackend 会持有 RocksDB 数据库实例,然后 Flink 引擎就可以与 RocksDB 进行交互了。
简言之:根据用户配置创建出不同类型的 StateBackend,然后不同的 StateBackend 再创建出对应的 keyedStateBackend 以及 operatorStateBackend,keyedStateBackend 和 operatorStateBackend 会真正的存储状态数据。
每种 StateBackend 到底会创建出哪种 keyedStateBackend 和哪种 operatorStateBackend 呢?这里引用 Flink 社区分享的图:
图中可以看出,Memory 和 Fs 会创建出 HeapKeyedStateBackend,RocksDB 会创建出 RocksDBKeyedStateBackend。无论哪种 StateBackend,都会使用 DefaultOperatorStateBackend。这里也验证了一点:RocksDB 数据库中只会存储 KeyedState,不会存储 OperatorState。因为 RocksDBStateBackend 对应的 OperatorState 的存储也是基于内存的。读到这里,读者应该理解 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系了。
初始化 keyedStateBackend 流程
下面重点关注初始化 keyedStateBackend 的流程,keyedStatedBackend 方法源码如下所示:
protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(XXX){
// 如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend
if (keySerializer == null) {
return null;
}
// 计算当前 subtask 负责的 KeyGroupRange
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask());
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle>
backendRestorer =
new BackendRestorerProcedure<>(
// 这里是函数式接口,并不是去 create KeyedStateBackend
(stateHandles) -> stateBackend.createKeyedStateBackend(XXX),
backendCloseableRegistry,
logDescription);
try {
// 这里去 create StateBackend 并恢复状态文件
return backendRestorer.createAndRestore(
// 获取 StateHandle 的集合
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
} finally {
if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
}
}
}
可以看到方法第一行:if (keySerializer == null) {return null;} 表示如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。计算当前 subtask 负责的 KeyGroupRange,然后创建 BackendRestorerProcedure 类型的 backendRestorer,且将 (stateHandles) -> stateBackend.createKeyedStateBackend(XXX) 传递给 BackendRestorerProcedure 构造器的 instanceSupplier,instanceSupplier 是一个函数式接口,用于创建 KeyedStateBackend。
backendRestorer.createAndRestore 方法会循环调用 attemptCreateAndRestore 恢复一个个 State, attemptCreateAndRestore 方法中调用 instanceSupplier(函数式接口)真正的创建 keyedStatedBackend。
instanceSupplier 函数式接口的工作:调用相应 StateBackend 的 createKeyedStateBackend 方法创建 AbstractKeyedStateBackend。如果 stateBackend 是 RocksDBStateBackend,就会创建出 RocksDBKeyedStateBackend。如果是 Memory 或 Fs 则会创建出 HeapKeyedStateBackend。在创建完 KeyedStateBackend 的过程中,会从 Checkpoint 中恢复状态到 Flink 引擎。
注:具体 KeyedStateBackend 恢复状态的流程比较复杂,每种 StateBackend 的恢复流程都不同,同时还牵扯到从 dfs 中拉取状态数据用于恢复,所以后续会有单独的博客介绍恢复流程。
初始化 operatorStateBackend 流程
operatorStateBackend 方法用于初始化 operatorStateBackend。operatorStateBackend 初始化流程与 keyedStateBackend 比较类似,区别在于最后调用的是 stateBackend.createOperatorStateBackend()。
三种 StateBackend 的 createOperatorStateBackend 方法非常相似,源码如下:
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
这里无论是何种 StateBackend,都会创建出 DefaultOperatorStateBackend。也就验证了一点:RocksDB 只支持 KeyedState,OperatorState 都是按照 Heap 的方案。
具体 new DefaultOperatorStateBackend 的过程由建造器 DefaultOperatorStateBackendBuilder 完成,build 的功能是创建出 OperatorStateBackend,并从 Checkpoint 中将 State 恢复到 Flink 引擎端。整体流程比较复杂,这里不阐述会在后面博客中单独介绍。
如果当前是 udf,则调用 udf 的 initializeState 方法
接下来重点又回到了 AbstractStreamOperator 类的 initializeState() 方法中,根据创建好的 operatorStateBackend 和 keyedStateStore 构造 Context。然后调用 initializeState(Context) 方法,之前说过要区分有参和无参的 initializeState 方法,现在执行到了有参的 initializeState(Context) 方法。
前面介绍过所有自定义的 UDF 都被包装起来,例如 MapFunction 都被 StreamMap 类包装起来,且这些 UDF 的包装类都继承自 AbstractUdfStreamOperator,AbstractStreamOperator 类的 initializeState(Context) 方法没有任何实现,这里重点关注 AbstractUdfStreamOperator 重写的 initializeState(Context) 方法。
AbstractUdfStreamOperator 类的 initializeState 方法源码:
@Override
public void initializeState(StateInitializationContext context) throws Exception {
// super 表示 AbstractStreamOperator 类
super.initializeState(context);
// 调用 udf 的 initializeState 方法
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
super 表示 AbstractStreamOperator 类,即调用 AbstractStreamOperator 类的 initializeState(context) 空方法,重点在于工具类 StreamingFunctionUtils 的 restoreFunctionState(context, userFunction) 方法,restoreFunctionState 方法内会对包装的 udf 进行解包装,然后执行 tryRestoreFunction 方法。
tryRestoreFunction 方法部分源码如下所示:
private static boolean tryRestoreFunction(
StateInitializationContext context,
Function userFunction) throws Exception {
// 调用 udf 的 initializeState 方法,
// 前提是 userFunction instanceof CheckpointedFunction
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).initializeState(context);
return true;
}
}
tryRestoreFunction 会对 userFunction 进行判断,如果实现了 CheckpointedFunction 接口,就调用 userFunction 的 initializeState(context) 对状态进行初始化。使用过 CheckpointedFunction 接口的同学应该清楚:自定义的 Function 可以实现 CheckpointedFunction 接口,重写 initializeState 方法,做一些状态的初始化操作。例如在 initializeState 方法申请创建 OperatorState。
udf 使用 initializeState 的经典案例就是 FlinkKafkaConsumerBase 类,FlinkKafkaConsumerBase 类实现了 CheckpointedFunction 接口,在 initializeState 方法中定义了 OperatorState 类型的 ListState,将 Flink 消费 Kafka 的 offset 信息维护在 ListState 中。每次启动任务时,都会从 ListState 中恢复之前的 offset,并从 offset 处继续消费。
initializeState 小结
initializeState 过程比较复杂,总的来说就两个事情:
- 1、 创建相应的 keyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State(具体恢复流程后续讲述)
- 2、 如果 udf 实现了 CheckpointedFunction 接口,则调用 udf 的 initializeState 方法
4. openAllOperators
此时回到了 StreamTask 初始化流程的下一步:openAllOperators。openAllOperators 方法比较简单,源码如下所示:
private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
}
}
}
openAllOperators 方法会调用 OperatorChain 中所有 StreamOperator 的 open 方法,通过继承关系,最后调用的仍然是 AbstractUdfStreamOperator 类的 open 方法。AbstractUdfStreamOperator 类的 open 方法源码如下所示:
// AbstractUdfStreamOperator 类的 open 方法
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
// FunctionUtils 类的 openFunction 方法
public static void openFunction(Function function
, Configuration parameters) {
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
AbstractUdfStreamOperator 类的 open 方法调用 FunctionUtils 类的 openFunction 方法,openFunction 方法中会判断当前 userFunction 是否实现了 RichFunction 接口,如果实现了 RichFunction 接口,则调用 userFunction 的 open 方法。
openAllOperators 小结
openAllOperators 的流程比较简单,就是判断 userFunction 是否实现了 RichFunction 接口,在 Flink 中实现了 RichFunction 表示富函数,可以定义 open 和 close 相关的逻辑,在算子初始化或者关闭时会被调用。
四、 思考题
如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?
dataStream.keyby(_._1)
.map(new MyMapFunction())
.filter(new MyFilterFunction())
.flatMap(new MyFlatMapFunction())
先说答案吧:在 MyMapFunction 中可以使用 ValueState,在 MyFilterFunction 和 MyFlatMapFunction 中不能使用 ValueState。如果在 MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState,都会报错,会显示 keyedStateBackend 为 null。为什么呢?
首先 Flink 中只有 KeyedState 才支持 ValueState 和 MapState,OperatorState 不支持 ValueState 和 MapState。只要对 KeyedStream 的操作才能使用 KeyedState,KeyedState 表示相同的 key 共享同一个 State,普通的 DataStream 中没有 key 的概念不能使用 KeyedState。
DataStream 的 keyBy 方法源码如下所示,由源码 DataStream 可以看到,DataStream 的 keyBy 方法会返回 KeyedStream,KeyedStream 是 DataStream 的子类,KeyedStream 经过 map 转换后又会变成 DataStream。所以上图中只有 MyMapFunction 是基于 KeyedStream 操作的,MyFilterFunction 和 MyFlatMapFunction 都是基于 DataStream 操作的,没有 key 的概念,因此不能使用 KeyedState,即不能使用 ValueState。
// DataStream 的 keyBy 方法源码
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}
MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState 时,为什么会报出 keyedStateBackend 为 null 呢?
回顾一下创建 keyedStateBackend 的流程,第一步就是 if (keySerializer == null) {return null;},如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。所以出现了上述现象。
五、 总结
本文介绍了 StreamTask 类的基本功能,StreamTask 映射到 ExecutionGraph 中对应的是一个 subtask,每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。随后介绍了 StreamTask 的生命周期,主要包括了初始化、run、close 三个流程。后半部分重点描述了 StreamTask 初始化的过程,主要是:createStateBackend、init、initializeState()、openAllOperators() 四个过程。
后续会给大家详细介绍 initializeState 部分如何从 Checkpoint 中恢复 State,也会详细介绍 run 流程到底是如何处理一条条数据的。