本文对 Flink 的 Application、Per-Job 和 Session 部署模式进行了对比分析。详细介绍了 Native Kubernetes 场景下的 Application 部署模式,并且对整个启动流程进行了源码分析。
1.Native Kubernetes Application 简介
1.1 Flink 部署模式简介
Flink 的部署模式有 Application、Per-Job 和 Session 模式。
Application、Per-Job 和 Session 部署模式的主要区别:
● 集群与作业的生命周期是否一致
● 资源的隔离程度
● 作业的mian()
运行在 client 还是集群上
Application 模式的特点:① 作业与 Flink 集群打包在一起,在 JobManager 的启动时候会执行作业的 main 函数直接启动作业,而不需要通过 Flink Client 提交作业。② 作业的生命周期与 Flink 集群的一致,即作业关闭后 Flink 集群也会关闭
说明:Application 模式对比 Per-Job 模式最大的区别是前者使用
executeAsync()
提交作业(不阻塞),而后者使用execute()
提交作业(阻塞),因此 Application 模式可以运行多个作业
Per-Job 模式的特点:作业与 Flink 集群不是打包在一起,在 JobManager 启动后需要通过 Flink Client 提交作业,即增加了网络传输的压力和客户端的 CPU 资源。
Session 模式的特点:常驻的 JobManager,多个作业共享同一个集群。如果其中一个作业异常导致 TaskManager 关闭,则该 TM 上的全部作业都会重新调度。
1.2 Flink Native Kubernetes Application 架构图
资源调度方面:Flink 支持 Kubernetes、Yarn 和 Mesos 资源调度器
Native 是指可以通过底层的资源调度管理器,实现弹性扩缩容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式,并使用 Kubernetes 进行资源管理。
用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。
Flink Application on Native Kubernetes 的实践案例:
《Flink on K8s 在阿里巴巴的实践》
《Native Flink on K8s 在小红书的实践》
《Flink on K8s 在京东的持续优化实践》
2.启动流程详解
2.1 启动流程总览
2.2 启动脚本及其配置
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=top-speed-windowing-application \
-Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
Native Kubernetes Application 模式下,启动脚本 ./bin/flink
的必要参数有 --target kubernetes-application
、-Dkubernetes.cluster-id=***
、-Dkubernetes.container.image=***
和 作业 jar 路径 local:///***
2.3 启动 JobManager 和作业
2.3.1 CliFrontend 入口
public int parseAndRun(String[] args) {
// 省略...
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
// 匹配参数 run-application
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
// 省略...
}
protected void runApplication(String[] args) throws Exception {
// 省略...
// 创建 ApplicationDeployer 用于创建 Kubernetes ClusterDescriptor
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
// 作业参数,例如 jar 路径、main 函数入口、args 入参等等
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
// 提交用户的作业并在集群中运行其 main 函数
deployer.run(effectiveConfiguration, applicationConfiguration);
}
2.3.2 Flink Client 通过 K8s Client 创建集群
public class ApplicationClusterDeployer implements ApplicationDeployer {
// 省略...
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
// 省略...
// 通过 ClusterClientServiceLoader 创建 KubernetesClusterClientFactory
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
// 通过 KubernetesClusterClientFactory 创建 KubernetesClusterDescriptor
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
// KubernetesClusterDescriptor 创建 application 集群
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
}
public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
// 省略...
@Override
public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
// 省略...
// 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/启动集群
final ClusterClientProvider<String> clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
clusterSpecification,
false);
// 省略...
}
private ClusterClientProvider<String> deployClusterInternal(
String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
throws ClusterDeploymentException {
// 省略...
// 设置集群配置,例如启动入口entry、blobserver端口、taskmanager rpc端口、rest端口等等
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
// 省略...
// 配置 JobManager 的 PodTemplate
try {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
final FlinkPod podTemplate =
kubernetesJobManagerParameters
.getPodTemplateFilePath()
.map(
file ->
KubernetesUtils.loadPodFromTemplateFile(
client, file, Constants.MAIN_CONTAINER_NAME))
.orElse(new FlinkPod.Builder().build());
// 配置 JobManager 的 Deployment
// 配置 Deployment 的过程中,利用 CmdJobManagerDecorator 设置 JobManager main container 的启动命令,即 kubernetes-jobmanager.sh kubernetes-application
final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters);
client.createJobManagerComponent(kubernetesJobManagerSpec);
return createClusterClientProvider(clusterId);
// 省略...
}
}
}
public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
// 省略...
// 利用 Fabric8 Kubernetes Client 创建 JobManager 的 deployment
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
}
2.3.3 容器内启动集群
public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
// 省略...
public static void main(final String[] args) {
// 省略...
// 设置作业配置
PackagedProgram program = null;
try {
program = getPackagedProgram(configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}
final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
new KubernetesApplicationClusterEntrypoint(configuration, program);
// 利用 helper 启动集群
ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
}
}
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 初始化 rpcserver、haservice、blobserver等
initializeServices(configuration, pluginManager);
// 省略...
// DispatcherResourceManagerComponent,其封装Dispatcher、ResourceManager和WebMonitorEndpoint
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 内部使用DispatcherRunnerFactory创建DispatcherRunner
// 接着Dispatcher选主的时候,DefaultDispatcherRunner.grantLeadership() 启动新 dispatcher leader即startNewDispatcherLeaderProcess(),DispatcherLeaderProcess.start()会利用JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap,最终调用ApplicationDispatcherBootstrap.runApplicationAsync()执行用户作业的main函数
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 省略...
}
}
当 Dispatcher 选择主节点的时候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 调用作业的 main函数
说明:Dispatcher 选主是利用了 Kubernetes Client 的
LeaderElector
,通过KubernetesLeaderElector
封装 LeaderElector,最终利用LeaderElectionEventHandler
处理选主的回调任务,其样例如下所示。
public class LeaderElectionExample {
public static void main(String[] args) throws Exception {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
// 创建 ConfigMap 锁
ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
// Leader 选举的配置
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(lock,
Duration.ofMillis(10000),
Duration.ofMillis(8000),
Duration.ofMillis(2000));
// 初始化 LeaderElector
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
// 选举 Leader
leaderElector.run(
() -> {
System.out.println("Do something when getting leadership.");
},
() -> {
System.out.println("Do something when losing leadership.");
});
}
}
2.3.4 ApplicationDispatcherBootstrap 启动作业
Dispatcher 通过 ApplicationDispatcherBootstrap
利用异步线程和反射机制,执行作业的 mian 函数,并且使用轮训的方式不断查询作业的状态,执行步骤如下:
步骤 1:通过 ThreadLocal
控制 Context 对象,在外部创建好 applicationJobIds
的引用列表并且层层传入,然后利用反射执行用户 main 函数;
步骤 2:在 main 函数中通过执行 execute 或 executeAysnc 生成流图并提交作业,接着把作业 ID 保存到 submitJobIds
即 applicationJobIds
,因此 ApplicationDispatcherBootstrap
可以获取提交的 jobId
步骤 3:循环每个作业 ID 查询其状态是否为结束状态。如果没有结束,则一直轮训状态;如果全部结束,则退出并关闭集群。
2.3.5 申请资源启动 TaskManager
说明:
KubernetesResourceManagerDriver.requestResource
通过 Kubernetes 申请资源启动 TaskManager。