Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
本篇我们从典型Flink用户代码入口开始分析整个Flink作业的执行流程。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 用户逻辑...
env.execute("XXX Job");
创建ExecutionEnvironment
Flink用户代码执行的第一步通常为获取执行的environment。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
我们从getExecutionEnvironment
方法开始分析。
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
其中Util.resolveFactory
方法接收两个参数,第一个是位于ThreadLocal中的factory,另一个是静态Factory。如果Threadlocal中存在一个factory,返回这个factory,否则返回静态factory。
如果从这两个factory中都无法创建出ExecutionEnvironment,则调用StreamExecutionEnvironment.createLocalEnvironment(configuration)
方法。会创建出一个LocalStreamEnvironment
,意味着使用本地运行模式,所有的任务在同一个JVM中运行。
如果用户使用命令行方式将Flink作业提交到集群,程序会执行如下逻辑:
CliFrontend.executeProgram
ClientUtils.executeProgram
StreamContextEnvironment.setAsContext
StreamExecutionEnvironment.initializeContextEnvironment
通过这些调用,StreamExecutionEnvironmentFactory
会被创建出来并设置到StreamExecutionEnvironment
的threadLocalContextEnvironmentFactory
和contextEnvironmentFactory
中。调用这个factory的createExecutionEnvironment
方法创建出一个StreamContextEnvironment
对象。这是将作业提交到远程Flink分布式集群的运行环境。
execute方法执行作业
execute
方法启动Flink作业,代码如下:
public JobExecutionResult execute() throws Exception {
// getJobName方法从配置文件pipeline.name配置项获取Job name
// 如果没有配置,使用默认名称"Flink Streaming Job"
return execute(getJobName());
}
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
return execute(getStreamGraph(jobName));
}
在execute
作业之前,有一个getStreamGraph
方法,将作业转换为StreamGraph
。生成StreamGraph
的过程参见 Flink 源码之StreamGraph生成。
接下来的execute
方法的执行步骤随着ExecutionEnvironment
的不同而不同。
StreamContextEnvironment
的execute
方法:
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 异步执行作业
final JobClient jobClient = executeAsync(streamGraph);
// 获取配置的作业监听器
final List<JobListener> jobListeners = getJobListeners();
try {
// 获取作业执行结果,逐个通知作业监听器
final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
jobListeners.forEach(
jobListener ->
jobListener.onJobExecuted(
null, ExceptionUtils.stripExecutionException(t)));
ExceptionUtils.rethrowException(t);
// never reached, only make javac happy
return null;
}
}
LocalStreamEnvironment
的execute
方法:
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
return super.execute(streamGraph);
}
调用了父类的execute
方法。它的父类正是StreamExecutionEnvironment
。
我们查看下它的execute
方法:
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 异步执行作业
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
// 使用attached模式执行作业由于需要保持client端不关闭,所以这里同步等待作业执行结果
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
// 异步模式则不需要
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
// 逐个通知jobListener
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
// get() on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was largely not there in Flink versions before the PipelineExecutor
// refactoring so we should strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
接下来我们分析StreamContextEnvironment
的executeAsync
方法。
@Override
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
// 检查一个environment中不能调用多次execute或executeAsync
validateAllowedExecution();
// 调用父类的executeAsync方法
// 和LocalStreamEnvironment中的executeAsync相同
final JobClient jobClient = super.executeAsync(streamGraph);
if (!suppressSysout) {
System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
}
return jobClient;
}
最终两个Environment的executeAsync
方法归为一致。
下面是StreamExecutionEnvironment
类的executeAsync
方法:
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
// 检查streamGraph不能为null
checkNotNull(streamGraph, "StreamGraph cannot be null.");
// 检查部署目标配置不能为null
// 部署目标即作业运行的模式,例如本地模式,远程模式,yarn模式或者是k8s模式
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
// 获取作业执行器
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
// 从执行器工厂获取执行器,运行包含用户作业的streamGraph
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
try {
// 通知各个作业监听器作业已提交
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
到这里不同的执行环境具体的逻辑又要开始走向不同。这些逻辑的分叉点在executorServiceLoader.getExecutorFactory(configuration)
,不同环境下获取到的PipelineExecutorFactory
是不同的。在创建LocalStreamEnvironment
或者是StreamContextEnvironment
的时候executorServiceLoader
变量传入的是DefaultExecutorServiceLoader
。我们查看它的getExecutorFactory
方法。如下所示:
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
// 根据配置文件加载并实例化PipelineExecutorFactory的实现类
final ServiceLoader<PipelineExecutorFactory> loader =
ServiceLoader.load(PipelineExecutorFactory.class);
final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final PipelineExecutorFactory factory = factories.next();
// 遍历所有加载的factory
// 只保留和配置文件兼容的factory
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable e) {
if (e.getCause() instanceof NoClassDefFoundError) {
LOG.info("Could not load factory due to missing dependencies.");
} else {
throw e;
}
}
}
// 如果兼容的factory有多个,打印错误信息
if (compatibleFactories.size() > 1) {
final String configStr =
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("\n"));
throw new IllegalStateException(
"Multiple compatible client factories found for:\n" + configStr + ".");
}
if (compatibleFactories.isEmpty()) {
throw new IllegalStateException("No ExecutorFactory found to execute the application.");
}
// 返回这个唯一的factory
return compatibleFactories.get(0);
}
在这个方法中使用了Java SPI机制,根据META-INF/services
内的配置文件动态加载并实例化PipelineExecutorFactory的子类。
PipelineExecutorFactory和PipelineExecutor
PipelineExecutorFactory
我们查找下org.apache.flink.core.execution.PipelineExecutorFactory
配置文件,发现共有3个,分别位于源码flink-clients
,flink-yarn
和flink-kubernetes
子项目中。下面逐个分析。
flink-clients
中的org.apache.flink.core.execution.PipelineExecutorFactory
内容如下:
org.apache.flink.client.deployment.executors.RemoteExecutorFactory
org.apache.flink.client.deployment.executors.LocalExecutorFactory
也就是说如果我们引入了flink-clients
包,会创建RemoteExecutorFactory
和LocalExecutorFactory
实例。
我们在分别查看下他们的isCompatibleWith
方法。
RemoteExecutorFactory
的isCompatibleWith
方法:
@Override
public boolean isCompatibleWith(final Configuration configuration) {
return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
}
要求配置文件中execution.target
配置为remote
,才会使用RemoteExecutorFactory
。
LocalExecutorFactory
的isCompatibleWith
方法:
@Override
public boolean isCompatibleWith(final Configuration configuration) {
return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
}
要求配置文件中execution.target
配置为local
,才会使用LocalExecutorFactory
。
flink-yarn
中的org.apache.flink.core.execution.PipelineExecutorFactory
内容如下:
org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
和上面类似,这里只说明各个factory兼容的配置项,不再贴出isCompatibleWith
方法代码。
- YarnJobClusterExecutorFactory:要求
execution.target
为yarn-per-job
- YarnSessionClusterExecutorFactory:要求
execution.target
为yarn-session
flink-kubernetes
中的org.apache.flink.core.execution.PipelineExecutorFactory
内容如下:
org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory
其中KubernetesSessionClusterExecutorFactory
要求execution.target
配置为kubernetes-session
。
PipelineExecutor
接下来我们重点讨论两个Executor
:LocalExecutor
和RemoteExecutor
。这两个executor创建的逻辑比较简单,此处不再介绍。
PipelineExecutor
执行作业的方法为execute
。它有3个参数:
- pipeline:要执行的作业,指的是StreamGraph。
- configuration:作业的配置。
- userCodeClassloader:用户作业的类加载器。和Flink本身使用不同类加载器的原因是不同用户作业加载的class可能冲突,用户作业和Flink框架本身加载的class也可能冲突。为了避免这种冲突,用户作业采用不同的类加载器加载。
LocalExecutor
LocalExecutor
用于在本地执行任务。它的execute
方法如下:
@Override
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
throws Exception {
// 参数校验
checkNotNull(pipeline);
checkNotNull(configuration);
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
// 只支持ATTACHED模式运行
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
// 将StreamGraph转换为JobGraph
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
// 创建一个MiniCluster
// 并调用MiniCluster的submitJob方法,提交作业
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
}
在submit作业之前,需要将StreamGraph
转换为JobGraph
。详细内容请参见 Flink 源码之JobGraph生成。
在方法的最后调用了PerJobMiniClusterFactory
的submitJob
方法。PerJobMiniClusterFactory
实际操作的是MiniCluster
对象。顾名思义,它是一个"小型集群",所有的作业都在本地运行。
PerJobMiniClusterFactory
的submitJob
方法代码如下:
public CompletableFuture<JobClient> submitJob(
JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
// 获取MiniCluster的配置,指定最大并行度
MiniClusterConfiguration miniClusterConfig =
getMiniClusterConfig(jobGraph.getMaximumParallelism());
// 创建出一个MiniCluster
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
// 启动miniCluster
miniCluster.start();
// 提交作业给miniCluster
return miniCluster
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
submissionResult -> {
// 执行成功后获取Job执行结果
org.apache.flink.client.ClientUtils
.waitUntilJobInitializationFinished(
() ->
miniCluster
.getJobStatus(
submissionResult
.getJobID())
.get(),
() ->
miniCluster
.requestJobResult(
submissionResult
.getJobID())
.get(),
userCodeClassloader);
return submissionResult;
}))
.thenApply(
result ->
// 然后关闭MiniCluster
new MiniClusterJobClient(
result.getJobID(),
miniCluster,
userCodeClassloader,
MiniClusterJobClient.JobFinalizationBehavior
.SHUTDOWN_CLUSTER))
.whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
// We failed to create the JobClient and must shutdown to ensure
// cleanup.
shutDownCluster(miniCluster);
}
})
.thenApply(Function.identity());
}
MiniCluster
相关内容我们另起一章单独介绍。
RemoteExecutor
remoteExecutor
的execute
方法位于它的父类AbstractSessionClusterExecutor
中。代码和分析如下所示:
@Override
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
// 和之前相同,仍然是生成JobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
// 获取远程集群ID
final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
// 创建出clusterClient,用户和远程集群通信,提交作业
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
return clusterClient
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
jobId -> {
ClientUtils.waitUntilJobInitializationFinished(
() -> clusterClient.getJobStatus(jobId).get(),
() -> clusterClient.requestJobResult(jobId).get(),
userCodeClassloader);
return jobId;
}))
.thenApplyAsync(
jobID ->
(JobClient)
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassloader))
.whenComplete((ignored1, ignored2) -> clusterClient.close());
}
}
ClusterClient
根据集群类型的不同,有两个子类:MiniClusterClient
和RestClusterClient
。其中MiniClusterClient
用户和MiniCluster
通信,它的submitJob
方法实际调用的是MiniCluster
的submitJob
方法。
RestClusterClient
通过http rest请求和远程集群通信。它的submitJob
方法如下所示:
@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
CompletableFuture<java.nio.file.Path> jobGraphFileFuture =
CompletableFuture.supplyAsync(
() -> {
try {
// 在系统临时文件目录创建一个空的临时文件
// 名称为flink-jobgraph.bin
final java.nio.file.Path jobGraphFile =
Files.createTempFile("flink-jobgraph", ".bin");
// 将JobGraph对象序列化为二进制数据写入这个临时文件
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to serialize JobGraph.", e));
}
},
executorService);
// 在JobGraph写入文件完成之后执行
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture =
jobGraphFileFuture.thenApply(
jobGraphFile -> {
List<String> jarFileNames = new ArrayList<>(8);
List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =
new ArrayList<>(8);
Collection<FileUpload> filesToUpload = new ArrayList<>(8);
// 需要上传的文件加入jobGraphFile
filesToUpload.add(
new FileUpload(
jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
// 从JobGraph中获取用户的jar文件路径,加入到上传列表
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(
new FileUpload(
Paths.get(jar.toUri()),
RestConstants.CONTENT_TYPE_JAR));
}
for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
artifacts : jobGraph.getUserArtifacts().entrySet()) {
final Path artifactFilePath =
new Path(artifacts.getValue().filePath);
try {
// Only local artifacts need to be uploaded.
// 上传用户作业运行所需的其它类型文件
// 只需要上传本地储存的文件
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
artifactFileNames.add(
new JobSubmitRequestBody.DistributedCacheFile(
artifacts.getKey(),
artifactFilePath.getName()));
filesToUpload.add(
new FileUpload(
Paths.get(artifacts.getValue().filePath),
RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
throw new CompletionException(
new FlinkException(
"Failed to get the FileSystem of artifact "
+ artifactFilePath
+ ".",
e));
}
}
// 创建作业提交请求体
final JobSubmitRequestBody requestBody =
new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);
// 返回请求体和需要上传的文件
return Tuple2.of(
requestBody, Collections.unmodifiableCollection(filesToUpload));
});
// 请求构建完毕后提交请求
final CompletableFuture<JobSubmitResponseBody> submissionFuture =
requestFuture.thenCompose(
requestAndFileUploads ->
sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable()));
// 请求发送完毕之后,删除JobGraph临时文件
submissionFuture
.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
.thenAccept(
jobGraphFile -> {
try {
Files.delete(jobGraphFile);
} catch (IOException e) {
LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);
}
});
return submissionFuture
.thenApply(ignore -> jobGraph.getJobID())
.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(
new JobSubmissionException(
jobGraph.getJobID(),
"Failed to submit JobGraph.",
ExceptionUtils.stripCompletionException(throwable)));
});
}
由于远程集群的JobManager
和TaskManager
需要单独部署启动,和本篇内容关联不大,此处不再详细描述。
其中TaskManager的启动过程参见 Flink 源码之TaskManager启动流程。
MiniCluster
前面已经简单介绍过MiniCluster
,它用于在本地环境执行作业。
start方法
启动MiniCluster
的逻辑位于start
方法中。
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
final Configuration configuration = miniClusterConfiguration.getConfiguration();
// MiniCluster中的组件使用同一个共享的RPC服务
final boolean useSingleRpcService =
miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
// 初始化IO相关配置,是否复写和是否创建output directory等
initializeIOFormatClasses(configuration);
LOG.info("Starting Metrics Registry");
// 创建监控相关配置
metricRegistry = createMetricRegistry(configuration);
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
final RpcService metricQueryServiceRpcService;
// 如果共享一个RPC服务
if (useSingleRpcService) {
// we always need the 'commonRpcService' for auxiliary calls
// 创建本地RPC服务
commonRpcService = createLocalRpcService(configuration);
// 创建通用RPC服务工厂
final CommonRpcServiceFactory commonRpcServiceFactory =
new CommonRpcServiceFactory(commonRpcService);
// TaskManagerRPC服务工厂使用通用RPC服务工厂
taskManagerRpcServiceFactory = commonRpcServiceFactory;
dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
// 启动RPC查询服务
metricQueryServiceRpcService =
MetricUtils.startLocalMetricsRpcService(configuration);
} else {
// start a new service per component, possibly with custom bind addresses
// 如果不共用RPC服务,获取JobManager和TaskManager地址和端口范围
final String jobManagerExternalAddress =
miniClusterConfiguration.getJobManagerExternalAddress();
final String taskManagerExternalAddress =
miniClusterConfiguration.getTaskManagerExternalAddress();
final String jobManagerExternalPortRange =
miniClusterConfiguration.getJobManagerExternalPortRange();
final String taskManagerExternalPortRange =
miniClusterConfiguration.getTaskManagerExternalPortRange();
final String jobManagerBindAddress =
miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress =
miniClusterConfiguration.getTaskManagerBindAddress();
// 分别创建各个组件的factory和服务等
dispatcherResourceManagerComponentRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
jobManagerExternalAddress,
jobManagerExternalPortRange,
jobManagerBindAddress);
taskManagerRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
taskManagerExternalAddress,
taskManagerExternalPortRange,
taskManagerBindAddress);
// we always need the 'commonRpcService' for auxiliary calls
// bind to the JobManager address with port 0
commonRpcService =
createRemoteRpcService(configuration, jobManagerBindAddress, 0);
metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration, commonRpcService.getAddress());
}
// 启动监控查询服务
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
// 创建进程监控指标组
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
RpcUtils.getHostname(commonRpcService),
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
// 创建IO线程池
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
// 创建高可用服务
haServices = createHighAvailabilityServices(configuration, ioExecutor);
// 启动blobServer
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 创建心跳服务
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// 创建blob缓存服务
blobCacheService =
new BlobCacheService(
configuration,
haServices.createBlobStore(),
new InetSocketAddress(
InetAddress.getLocalHost(), blobServer.getPort()));
// 启动TaskManager
startTaskManagers();
// 创建监控查询获取服务
MetricQueryServiceRetriever metricQueryServiceRetriever =
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService());
// 创建Dispatcher和ResourceManager,它们在同一个进程中运行
setupDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagerComponentRpcServiceFactory,
metricQueryServiceRetriever);
// 创建ResourceManager leader获取服务
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
clusterRestEndpointLeaderRetrievalService =
haServices.getClusterRestEndpointLeaderRetriever();
// 创建Dispatcher gateway获取服务
dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
// 创建WebMonitor leader获取服务
webMonitorLeaderRetriever = new LeaderRetriever();
// 分别启动这些服务
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
} catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
通过上面的分析我们熟悉了MiniCluster
的启动流程。接下来分析下启动TaskManager
的逻辑,位于startTaskManagers
方法。
@GuardedBy("lock")
private void startTaskManagers() throws Exception {
// 获取有几个TaskManager
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
// 启动这些TaskManager
for (int i = 0; i < numTaskManagers; i++) {
startTaskManager();
}
}
继续跟踪startTaskManager
方法:
public void startTaskManager() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
// 构建出TaskExecutor
final TaskExecutor taskExecutor =
TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
taskManagerTerminatingFatalErrorHandlerFactory.create(
taskManagers.size()));
// 启动TaskExecutor
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
到这里TaskManager
已经启动完毕。
submitJob方法
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
// 这里创建了3个future:获取DispatcherGateway,获取blob server地址和上传作业到blob server
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
getDispatcherGatewayFuture();
final CompletableFuture<InetSocketAddress> blobServerAddressFuture =
createBlobServerAddress(dispatcherGatewayFuture);
final CompletableFuture<Void> jarUploadFuture =
uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
// 等他们都执行完毕后调用Dispatcher的submitJob方法
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture =
jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
(Void ack, DispatcherGateway dispatcherGateway) ->
dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
逻辑流转到了Dispatcher
的submitJob
方法。我们在接下来的章节分析。
Dispatcher
Dispatcher
负责提交作业和创建出JobManager
。
Dispatcher
有2个子类,MiniDispatcher
和StandaloneDispatcher
。分别用于提交任务给MiniCluster
和其他独立的cluster。其中StandaloneDispatcher
的实现最为简单,没有override父类Dispatcher
的任何方法。
接下来我们分析下MiniDispatcher
的submitJob
方法。
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
// 调用Dispatcher的submitJob方法
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture =
super.submitJob(jobGraph, timeout);
acknowledgeCompletableFuture.whenComplete(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(
new FlinkException(
"Failed to submit job "
+ jobGraph.getJobID()
+ " in job mode.",
throwable));
}
});
return acknowledgeCompletableFuture;
}
继续查看父类Dispacher
的submitJob
方法,如下所示:
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try {
// 检查作业是否重复
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new DuplicateJobSubmissionException(jobGraph.getJobID()));
// 检查部分资源是否已配置
} else if (isPartialResourceConfigured(jobGraph)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(
jobGraph.getJobID(),
"Currently jobs is not supported if parts of the vertices have "
+ "resources configured. The limitation will be removed in future versions."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}
接下来流程到了internalSubmitJob
方法,代码如下:
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
// 调用persistAndRunJob方法
final CompletableFuture<Acknowledge> persistAndRunFuture =
waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
final Throwable strippedThrowable =
ExceptionUtils.stripCompletionException(throwable);
log.error(
"Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(
jobGraph.getJobID(),
"Failed to submit job.",
strippedThrowable));
} else {
return acknowledge;
}
},
ioExecutor);
}
继续跟踪persistAndRunJob
方法:
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
// 存储JobGraph
jobGraphWriter.putJobGraph(jobGraph);
// 调用runJob方法
runJob(jobGraph, ExecutionType.SUBMISSION);
}
分析下runJob
方法:
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
// 记录作业初始化时间戳
long initializationTimestamp = System.currentTimeMillis();
// 调用启动JobManager逻辑
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
createJobManagerRunner(jobGraph, initializationTimestamp);
DispatcherJob dispatcherJob =
DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName(),
initializationTimestamp);
// 将当前Job存入runningJobs集合
runningJobs.put(jobGraph.getJobID(), dispatcherJob);
final JobID jobId = jobGraph.getJobID();
// 处理Job提交结果,进行cleanup操作
final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
dispatcherJob
.getResultFuture()
.handleAsync(
(dispatcherJobResult, throwable) -> {
Preconditions.checkState(
runningJobs.get(jobId) == dispatcherJob,
"The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
if (dispatcherJobResult != null) {
return handleDispatcherJobResult(
jobId, dispatcherJobResult, executionType);
} else {
return dispatcherJobFailed(jobId, throwable);
}
},
getMainThreadExecutor());
// 在cleanup完成之后终止作业
final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture
.thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
.thenCompose(Function.identity());
FutureUtils.assertNoException(jobTerminationFuture);
registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
}
这里到了MiniCluster
提交作业流程的最后一步,创建JobManagerRunner
。
createJobManagerRunner
方法分析如下:
CompletableFuture<JobManagerRunner> createJobManagerRunner(
JobGraph jobGraph, long initializationTimestamp) {
// 获取RPC服务
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
// 创建出一个JobManager启动器
// 传入JobGraph和其他参数
JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(
jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 启动JobManager启动器
runner.start();
return runner;
} catch (Exception e) {
throw new CompletionException(
new JobInitializationException(
jobGraph.getJobID(),
"Could not instantiate JobManager.",
e));
}
},
ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
// JobManager creation
}
下面章节我们开始分析JobManager启动逻辑。
JobManagerRunner
JobManagerRunnerFactory
接下来我们分析JobManagerRunnerFactory
的唯一实现类DefaultJobManagerRunnerFactory
的createJobManagerRunner
方法。
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {
// 创建JobManager的配置
final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);
// 创建SlotPool服务和Scheduler的工厂类
// 该工厂类用于创建SlotPoolService和SchedulerNG
final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType());
// 检查是否启用REACTIVE特性
// 只能够在standalone模式下启用
// 这个是试验特性
if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE) {
Preconditions.checkState(
slotPoolServiceSchedulerFactory.getSchedulerType()
== JobManagerOptions.SchedulerType.Adaptive,
"Adaptive Scheduler is required for reactive mode");
ReactiveModeUtils.configureJobGraphForReactiveMode(jobGraph);
}
// 创建ShuffleMaster
// ShuffleMaster负责注册ResultPartition和这个partition的相关信息,包含所在位置,execution id和连接信息
final ShuffleMaster<?> shuffleMaster =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
.createShuffleMaster(configuration);
// 创建JobMaster服务工厂
// JobMaster服务用于获取JobManaster所在地址,获取和JobMaster通信的Gateway
final JobMasterServiceFactory jobMasterFactory =
new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
slotPoolServiceSchedulerFactory,
rpcService,
highAvailabilityServices,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
shuffleMaster);
// 创建JobManagerRunnerImpl
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices,
jobManagerServices
.getLibraryCacheManager()
.registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler,
initializationTimestamp);
}
JobManagerRunner
JobManagerRunner
用于启动JobManager。JobManagerRunner
在创建时需要获取用户代码类加载器,RunningJobsRegistry(用于跟踪job执行状态,等待执行,执行中或者是执行完毕)和leader选举服务。
接下来需要启动JobManager
。我们查看下start
方法:
@Override
public void start() throws Exception {
try {
leaderElectionService.start(this);
} catch (Exception e) {
log.error(
"Could not start the JobManager because the leader election service did not start.",
e);
throw new Exception("Could not start the leader election service.", e);
}
}
start
方法就一个任务:启动leader选举服务。启动leader选举过程。leader选举过程详细分析参见Flink 源码之leader选举。
一旦有JobManagerRunner
实例被授予leader角色,它的grantLeadership
方法会被调用。grantLeadership
是LeaderContender
接口的方法,所有参与leader竞选的角色都必须要实现这个接口。JobManagerRunner
自然也不例外。
我们查看grantLeadership
方法:
@Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.debug(
"JobManagerRunner cannot be granted leadership because it is already shut down.");
return;
}
leadershipOperation =
leadershipOperation.thenRun(
ThrowingRunnable.unchecked(
() -> {
synchronized (lock) {
verifyJobSchedulingStatusAndStartJobManager(
leaderSessionID);
}
}));
handleException(leadershipOperation, "Could not start the job manager.");
}
}
接下来到了校验作业调度和启动JobManager的时候。这部分逻辑位于verifyJobSchedulingStatusAndStartJobManager
方法。
@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
throws FlinkException {
if (shutdown) {
log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
return;
}
// 检查作业调度状态
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
getJobSchedulingStatus();
// 如果作业已经执行完毕,调用执行完毕逻辑
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
jobAlreadyDone();
} else {
// 否则,启动JobMaster
startJobMaster(leaderSessionId);
}
}
这么长一路下来,终于到了启动JobMaster的时候。
@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException {
log.info(
"JobManager runner for job {} ({}) was granted leadership with session id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
leaderSessionId);
try {
// 先设置作业的状态为正在运行
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
throw new FlinkException(
String.format(
"Failed to set the job %s to running in the running jobs registry.",
jobGraph.getJobID()),
e);
}
// 然后启动JobMaster服务
startJobMasterServiceSafely(leaderSessionId);
if (jobMasterService != null) {
confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
}
}
继续跟踪startJobMasterServiceSafely
方法,内容如下:
private void startJobMasterServiceSafely(UUID leaderSessionId) {
checkState(jobMasterService == null, "JobMasterService must be null before being started.");
try {
// 创建出JobMasterService
final JobMasterService newJobMasterService =
jobMasterServiceFactory.createJobMasterService(
jobGraph,
new JobMasterId(leaderSessionId),
this,
userCodeClassLoader,
initializationTimestamp);
jobMasterService = newJobMasterService;
jobMasterService
.getTerminationFuture()
.whenComplete(
(unused, throwable) -> {
if (throwable != null) {
synchronized (lock) {
// check that we are still running and the JobMasterService
// is still valid
if (!shutdown && newJobMasterService == jobMasterService) {
handleJobManagerRunnerError(throwable);
}
}
}
});
} catch (Exception e) {
resultFuture.complete(
JobManagerRunnerResult.forInitializationFailure(
new JobInitializationException(
jobGraph.getJobID(), "Could not start the JobMaster.", e)));
}
}
该方法中的重点为jobMasterServiceFactory.createJobMasterService
。JobMasterServiceFactory
只有一个实现类DefaultJobMasterServiceFactory
。它的createJobMasterService
方法如下:
@Override
public JobMaster createJobMasterService(
JobGraph jobGraph,
JobMasterId jobMasterId,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader,
long initializationTimestamp)
throws Exception {
// 创建JobMaster对象
final JobMaster jobMaster =
new JobMaster(
rpcService,
jobMasterId,
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolServiceSchedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
shuffleMaster,
lookup ->
new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(), shuffleMaster, lookup),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
// 启动JobMaster
jobMaster.start();
return jobMaster;
}
到这里为止,执行过程流转到了JobMaster
对象。
JobMaster
JobMaster
负责执行一个JobGraph
。
上面的jobMaster.start()
方法通过RPC服务调用JobMaster
的onStart
方法。
@Override
protected void onStart() throws JobMasterException {
try {
startJobExecution();
} catch (Exception e) {
final JobMasterException jobMasterException =
new JobMasterException("Could not start the JobMaster.", e);
handleJobMasterError(jobMasterException);
throw jobMasterException;
}
}
startJobExecution
方法启动JobMaster服务和开始任务调度:
private void startJobExecution() throws Exception {
// 检查是否在主线程执行
validateRunsInMainThread();
startJobMasterServices();
log.info(
"Starting execution of job {} ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
getFencingToken());
startScheduling();
}
startJobMasterServices
方法创建出TaskManager
心跳管理器,启动SlotPoolService
和建立起ResourceManager
leader的连接(ResourceManager
也有leader选举过程)。
private void startJobMasterServices() throws Exception {
try {
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
我们回到上面的startScheduling
方法,查看它的代码:
private void startScheduling() {
schedulerNG.startScheduling();
}
下面章节继续分析SchedulerNG
执行逻辑。
SchedulerNG
SchedulerNG
为Flink的调度器接口,负责根据JobGraph
创建ExecutionGraph
然后将作业调度执行。
下面的分析我们已默认的DefaultScheduler
为准进行分析。DefaultScheduler
的父类为SchedulerBase
。它在初始化的时候将JobGraph
转换为ExecutionGraph
。
this.executionGraph =
createAndRestoreExecutionGraph(
jobManagerJobMetricGroup,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkNotNull(shuffleMaster),
checkNotNull(partitionTracker),
checkNotNull(executionDeploymentTracker),
initializationTimestamp,
mainThreadExecutor,
jobStatusListener);
创建ExecutionGraph
的详细过程参见 Flink 源码之ExecutionGraph。
我们回到开始调度执行的逻辑。SchedulerBase
的startScheduling
方法调用了startSchedulingInternal
。
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
// 设置ExecutionGraph的JobStatus状态为Running
transitionToRunning();
// 执行调度策略的startScheduling方法
schedulingStrategy.startScheduling();
}
接下来我们查看下SchedulingStrategy
唯一的实现类PipelinedRegionSchedulingStrategy
的startScheduling
方法。
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
此方法先创建出sourceRegions
集合。获取所有的Pipeline类型的Region。Piplined Region详细描述请参见 Flink 源码之ExecutionGraph。然后过滤掉其中不包含consumed result的region(region从上游region接收的数据称为consumed result,为下游region输出的数据称为produced result),即最后剩下的是pipelined region的源头节点。
接下来轮到maybeScheduleRegions
方法。
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region);
}
}
该方法将这些region安装拓扑顺序排序后,逐个调用maybeScheduleRegion
方法。
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
// 如果region中有一个consumed result状态不是CONSUMABLE(数据可以被消费),返回
if (!areRegionInputsAllConsumable(region)) {
return;
}
// 检查region中所有的节点必须为已创建状态
checkState(
areRegionVerticesAllInCreatedState(region),
"BUG: trying to schedule a region which is not in CREATED state");
// 创建各个Execution节点和部署选项
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
// 开始为执行节点分配资源(slot)和部署
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
SchedulerOperations
接口是SchedulingStrategy
用于实现调度决策的方法。它拥有一个实现类DefaultScheduler
。
@Override
public void allocateSlotsAndDeploy(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
// 检查每个ExecutionVertex的状态必须为ExecutionState.CREATED
validateDeploymentOptions(executionVertexDeploymentOptions);
// 将ExecutionVertexID提取成key
final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
// 提取出所有的ExecutionVertexID
final List<ExecutionVertexID> verticesToDeploy =
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList());
// 创建每个ExecutionVertex的版本信息,默认值为1L
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
// 将每个ExecutionVertex的状态切换为ExecutionState.SCHEDULED
transitionToScheduled(verticesToDeploy);
// 为所有vertex分配slot,即执行所需的资源
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
allocateSlots(executionVertexDeploymentOptions);
// 创建vertex部署相关信息
// 即ExecutionVertexVersion,ExecutionVertexDeploymentOption和SlotExecutionVertexAssignment的包装类
final List<DeploymentHandle> deploymentHandles =
createDeploymentHandles(
requiredVersionByVertex,
deploymentOptionsByVertex,
slotExecutionVertexAssignments);
// 等待所有节点分配资源和部署完毕
waitForAllSlotsAndDeploy(deploymentHandles);
}
我们继续跟踪waitForAllSlotsAndDeploy
方法:
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
FutureUtils.assertNoException(
assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
}
assignAllResources
方法给所有的vertex分配执行所需的资源,然后执行deployAll
部署所有节点。
private BiFunction<Void, Throwable, Void> deployAll(
final List<DeploymentHandle> deploymentHandles) {
return (ignored, throwable) -> {
propagateIfNonNull(throwable);
for (final DeploymentHandle deploymentHandle : deploymentHandles) {
final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
deploymentHandle.getSlotExecutionVertexAssignment();
final CompletableFuture<LogicalSlot> slotAssigned =
slotExecutionVertexAssignment.getLogicalSlotFuture();
// 确保slot分配已经完成
checkState(slotAssigned.isDone());
// 在slot分配完成后执行deployOrHandleError方法
FutureUtils.assertNoException(
slotAssigned.handle(deployOrHandleError(deploymentHandle)));
}
return null;
};
}
接着查看deployOrHandleError
方法,代码如下:
private BiFunction<Object, Throwable, Void> deployOrHandleError(
final DeploymentHandle deploymentHandle) {
final ExecutionVertexVersion requiredVertexVersion =
deploymentHandle.getRequiredVertexVersion();
final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
return (ignored, throwable) -> {
// 检查ExecutionVertex的版本是否已经修改
// 如果已经修改,说明该vertex被其他deployment部署,终止本次部署操作
if (executionVertexVersioner.isModified(requiredVertexVersion)) {
log.debug(
"Refusing to deploy execution vertex {} because this deployment was "
+ "superseded by another deployment",
executionVertexId);
return null;
}
if (throwable == null) {
// 部署ExecutionVertex
deployTaskSafe(executionVertexId);
} else {
handleTaskDeploymentFailure(executionVertexId, throwable);
}
return null;
};
}
接着跳转到deployTaskSafe
方法:
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
try {
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
// 在获取ExecutionVertex之后部署节点
executionVertexOperations.deploy(executionVertex);
} catch (Throwable e) {
handleTaskDeploymentFailure(executionVertexId, e);
}
}
查看下ExecutionVertexOperations
的唯一实现类DefaultExecutionVertexOperations
的deploy
方法:
@Override
public void deploy(final ExecutionVertex executionVertex) throws JobException {
executionVertex.deploy();
}
该方法调用了ExecutionVertex
的部署方法。
ExecutionVertex
ExecutionVertex
的deploy
方法内容如下。
public void deploy() throws JobException {
currentExecution.deploy();
}
其中currentExecution
为Execution
对象。每次尝试执行ExecutionVertex
都会创建出一个Execution
对象。currentExecution
变量保存了最近一次创建的Execution
。
Execution
的deploy
方法内容请见下面说明:
public void deploy() throws JobException {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;
checkNotNull(
slot,
"In order to deploy the execution we first have to assign a resource via tryAssignResource.");
// Check if the TaskManager died in the meantime
// This only speeds up the response to TaskManagers failing concurrently to deployments.
// The more general check is the rpcTimeout of the deployment call
// 资源必须是可用状态
if (!slot.isAlive()) {
throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}
// make sure exactly one deployment call happens from the correct state
// note: the transition from CREATED to DEPLOYING is for testing purposes only
ExecutionState previous = this.state;
// 执行状态从SCHEDULED或CREATED转换成DEPLOYING
if (previous == SCHEDULED || previous == CREATED) {
if (!transitionState(previous, DEPLOYING)) {
// race condition, someone else beat us to the deploying call.
// this should actually not happen and indicates a race somewhere else
throw new IllegalStateException(
"Cannot deploy task: Concurrent deployment call race.");
}
} else {
// vertex may have been cancelled, or it was already scheduled
throw new IllegalStateException(
"The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "
+ previous);
}
// 检查slot是否分配给了当前这个Execution
if (this != slot.getPayload()) {
throw new IllegalStateException(
String.format(
"The execution %s has not been assigned to the assigned slot.", this));
}
try {
// race double check, did we fail/cancel and do we need to release the slot?
// 再次检查状态是否是部署中(DEPLOYING)
if (this.state != DEPLOYING) {
slot.releaseSlot(
new FlinkException(
"Actual state of execution "
+ this
+ " ("
+ state
+ ") does not match expected state DEPLOYING."));
return;
}
LOG.info(
"Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
vertex.getTaskNameWithSubtaskIndex(),
attemptNumber,
vertex.getCurrentExecutionAttempt().getAttemptId(),
getAssignedResourceLocation(),
slot.getAllocationId());
// 创建Task部署描述符,用来创建Task
final TaskDeploymentDescriptor deployment =
TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
.createDeploymentDescriptor(
slot.getAllocationId(),
taskRestore,
producedPartitions.values());
// null taskRestore to let it be GC'ed
taskRestore = null;
// 获取提供资源的TaskManagerGateway
// 用来和TaskManager通信
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
getVertex().notifyPendingDeployment(this);
// We run the submission in the future executor so that the serialization of large TDDs
// does not block
// the main thread and sync back to the main thread once submission is completed.
// RPC调用,告诉TaskManager创建一个Task,执行当前Execution
CompletableFuture.supplyAsync(
() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
if (failure == null) {
vertex.notifyCompletedDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname =
vertex.getTaskNameWithSubtaskIndex()
+ " ("
+ attemptId
+ ')';
markFailed(
new Exception(
"Cannot deploy task "
+ taskname
+ " - TaskManager ("
+ getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of "
+ rpcTimeout,
failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
} catch (Throwable t) {
markFailed(t);
}
}
最后,我们经历了种种曲折又复杂的过程,终于到了TaskManager执行task这一步。Task的创建和执行作者打算单独开篇来讲解。到此为止,Flink作业执行流程已分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。