Flink是如何调度Job的,以及如何在JobManager上表现并跟踪Job状态
调度
Flink 通过任务槽(Task slot)定义执行资源,每个 TaskManager 都有一或多个任务槽,每个任务槽都可以运行一个并行任务流,一个 pipeline 包括多个连续的任务,例如一个 MapFunction 的第n个并行实例与一个 ReduceFunction 的第n个并行实例的连续任务。Flink 通常会并发执行连续的任务,对于流式程序来说,任何情况都如此执行;而对于批处理程序,多数情况也如此执行。
参照下图说明,由一个 Data source、一个 MapFunction 和一个 ReduceFunction 组成的程序,Data source 和 MapFunction 的并发度都为4,而 ReduceFunction 的并发度为3。一个 pipeline 由 Source-Map-Reduce 组成,在具有2个 TaskManager,每个 TaskManager 有3个 Task slot 的集群上运行,程序执行情况下:
说明如下:
- TaskManager 1上,有2个并行的 ExecutionVertex 组成的DAG图,各占用一个 Task slot
- TaskManager 2上,有2个并行的 ExecutionVertex 组成的 DAG 图,各占用一个Task Slot
- 在2个 TaskManager 上运行的4个 Execution 是并行执行的
Flink 内通过 SlotSharingGroup 和 CoLocationGroup 来定义哪些任务共享一个 Task slot,哪些任务必须严格的放到一个 Task slot中。
JobManager数据结构
作业执行期间,JobManager 会持续跟踪分布式任务,决定什么时候调度下一个 Task(或者一组任务),并且对已完成的或执行失败的任务进行响应。
JobManager 接收 JobGraph,JobGraph 是数据流的表现形式,包括算子(JobVertex)和中间结果(IntermediateDataSet)。每个算子都有诸如并行度和执行代码等属性。此外,JobGraph 还拥有一些在算子执行代码时所需要的附加库。
JobManager 将 JobGraph 转换为 ExecutionGraph,ExecutionGraph 是 JobGraph 的并行版本:对每个 JobVertex,包含并行子任务的 ExecutionVertex。一个并行度为100的算子将拥有一个 JobVertex 和100个 ExecutionVertex。ExecutionVertex 会跟踪特定子任务的执行状态。来自一个 JobVertex 的所有 ExecutionVertex 都由一个 ExecutionJobVertex 管理保存,ExecutionJobVertex 跟踪算子整体状态。除了各个节点之外,ExecutionGraph 同样包括了 IntermediateResult 和 IntermediateResultPartition,前者跟踪 IntermediateDataSet 的状态,后者跟踪每个分区的状态。
每个 ExecutionGraph 都有一个与其相关联的作业状态。此作业状态指示作业执行的当前状态。
作业首先处于创建状态(created),然后切换到运行状态(running),并且在完成所有工作后,它将切换到完成状态(finished)。在失败的情况下,作业首先切换到失败状态(failing),取消所有正在运行任务。如果所有节点都已达到最终状态,并且作业不可重新启动,则状态将转换为失败(failed)。如果作业可以重新启动,那么它将进入重新启动状态(restarting)。一旦完成重新启动,它将变成创建状态(created)。
在用户取消作业的情况下,将进入取消状态(cancelling),会取消所有当前正在运行的任务。一旦所有运行的任务已经达到最终状态,该作业将转换到已取消状态(canceled)。
完成状态(finished),取消状态(canceled)和失败状态(failed)表示一个全局的终结状态,并且触发清理工作,而暂停状态(suspended)仅处于本地终止状态。意味着作业的执行在相应的 JobManager 上终止,但集群的另一个 JobManager 可以从持久的HA存储中恢复这个作业并重新启动。因此,处于暂停状态的作业将不会被完全清理。
在执行 ExecutionGraph 期间,每个并行任务经过多个阶段,从创建(created)到完成(finished)或失败(failed) ,下图说明了它们之间的状态和可能的转换。任务可以执行多次(例如故障恢复)。每个 Execution 跟踪一个 ExecutionVertex 的执行,每个 ExecutionVertex 都有一个当前 Execution(current execution)和一个前驱 Execution(prior execution)。
补充:Flink 执行图
在 Flink 中的执行图可以分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
- StreamGraph:Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。可以调用 env.getExecutionPlan() 输出json串,将该 JSON 串粘贴到 http://flink.apache.org/visualizer/ 可视化该执行图。
- JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
- ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
四层执行图的演变过程如下图所示(来源:Flink 原理与实现:架构和拓扑概览):
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/job_scheduling.html
http://wuchong.me/blog/2016/05/03/flink-internals-overview/