【Flink on k8s】Native Kubernetes Application 部署模式详解

本文对 Flink 的 Application、Per-Job 和 Session 部署模式进行了对比分析。详细介绍了 Native Kubernetes 场景下的 Application 部署模式,并且对整个启动流程进行了源码分析

1.Native Kubernetes Application 简介

1.1 Flink 部署模式简介

Flink 的部署模式有 Application、Per-Job 和 Session 模式

Application、Per-Job 和 Session 部署模式的主要区别:
● 集群与作业的生命周期是否一致
● 资源的隔离程度
● 作业的 mian() 运行在 client 还是集群上

Application 模式的特点:① 作业与 Flink 集群打包在一起,在 JobManager 的启动时候会执行作业的 main 函数直接启动作业,而不需要通过 Flink Client 提交作业。② 作业的生命周期与 Flink 集群的一致,即作业关闭后 Flink 集群也会关闭

说明:Application 模式对比 Per-Job 模式最大的区别是前者使用 executeAsync() 提交作业(不阻塞),而后者使用 execute() 提交作业(阻塞),因此 Application 模式可以运行多个作业

Per-Job 模式的特点:作业与 Flink 集群不是打包在一起,在 JobManager 启动后需要通过 Flink Client 提交作业,即增加了网络传输的压力和客户端的 CPU 资源。

Session 模式的特点:常驻的 JobManager,多个作业共享同一个集群。如果其中一个作业异常导致 TaskManager 关闭,则该 TM 上的全部作业都会重新调度。

部署模式汇总.PNG

1.2 Flink Native Kubernetes Application 架构图

资源调度方面:Flink 支持 Kubernetes、Yarn 和 Mesos 资源调度器

Native 是指可以通过底层的资源调度管理器,实现弹性扩缩容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式,并使用 Kubernetes 进行资源管理。

用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

flink native kubernetes application 架构图.png

Flink Application on Native Kubernetes 的实践案例
《Flink on K8s 在阿里巴巴的实践》
《Native Flink on K8s 在小红书的实践》
《Flink on K8s 在京东的持续优化实践》

2.启动流程详解

2.1 启动流程总览

image.png

2.2 启动脚本及其配置

$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=top-speed-windowing-application \
    -Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
    local:///opt/flink/examples/streaming/TopSpeedWindowing.jar

Native Kubernetes Application 模式下,启动脚本 ./bin/flink 的必要参数有 --target kubernetes-application-Dkubernetes.cluster-id=***-Dkubernetes.container.image=*** 和 作业 jar 路径 local:///***

2.3 启动 JobManager 和作业

2.3.1 CliFrontend 入口

    public int parseAndRun(String[] args) {
        // 省略...
        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                 // 匹配参数 run-application
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                // 省略...
    }


    protected void runApplication(String[] args) throws Exception {
        //  省略...

        //  创建 ApplicationDeployer 用于创建 Kubernetes ClusterDescriptor
        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        } else {
            //  作业参数,例如 jar 路径、main 函数入口、args 入参等等
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        //  提交用户的作业并在集群中运行其 main 函数
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

2.3.2 Flink Client 通过 K8s Client 创建集群

public class ApplicationClusterDeployer implements ApplicationDeployer {
    // 省略...
    public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        // 省略...
        // 通过 ClusterClientServiceLoader 创建 KubernetesClusterClientFactory
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            // 通过 KubernetesClusterClientFactory 创建 KubernetesClusterDescriptor
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);
            // KubernetesClusterDescriptor 创建 application 集群
            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }
}

public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
    // 省略...
    @Override
    public ClusterClientProvider<String> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        // 省略...
        // 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/启动集群
        final ClusterClientProvider<String> clusterClientProvider =
                deployClusterInternal(
                        KubernetesApplicationClusterEntrypoint.class.getName(),
                        clusterSpecification,
                        false);
        // 省略...
    }


    private ClusterClientProvider<String> deployClusterInternal(
            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
            throws ClusterDeploymentException {
        // 省略...
        // 设置集群配置,例如启动入口entry、blobserver端口、taskmanager rpc端口、rest端口等等
        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);

        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
        // 省略...

        // 配置 JobManager 的 PodTemplate
        try {
            final KubernetesJobManagerParameters kubernetesJobManagerParameters =
                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);

            final FlinkPod podTemplate =
                    kubernetesJobManagerParameters
                            .getPodTemplateFilePath()
                            .map(
                                    file ->
                                            KubernetesUtils.loadPodFromTemplateFile(
                                                    client, file, Constants.MAIN_CONTAINER_NAME))
                            .orElse(new FlinkPod.Builder().build());
            // 配置 JobManager 的 Deployment
            // 配置 Deployment 的过程中,利用 CmdJobManagerDecorator 设置 JobManager main container 的启动命令,即 kubernetes-jobmanager.sh kubernetes-application
            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                            podTemplate, kubernetesJobManagerParameters);

            client.createJobManagerComponent(kubernetesJobManagerSpec);

            return createClusterClientProvider(clusterId);
            // 省略...
        } 
    }
}

public class Fabric8FlinkKubeClient implements FlinkKubeClient {
    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        final Deployment deployment = kubernetesJMSpec.getDeployment();
        // 省略...
        // 利用  Fabric8 Kubernetes Client 创建 JobManager 的 deployment
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }
}

2.3.3 容器内启动集群

public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
    // 省略...
    public static void main(final String[] args) {
        // 省略...
        // 设置作业配置
        PackagedProgram program = null;
        try {
            program = getPackagedProgram(configuration);
        } catch (Exception e) {
            LOG.error("Could not create application program.", e);
            System.exit(1);
        }

        try {
            configureExecution(configuration, program);
        } catch (Exception e) {
            LOG.error("Could not apply application configuration.", e);
            System.exit(1);
        }

        final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
                new KubernetesApplicationClusterEntrypoint(configuration, program);
        // 利用 helper 启动集群
        ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
    }
}

    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //  初始化 rpcserver、haservice、blobserver等
            initializeServices(configuration, pluginManager);
            //  省略...
            //  DispatcherResourceManagerComponent,其封装Dispatcher、ResourceManager和WebMonitorEndpoint
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);
            //  内部使用DispatcherRunnerFactory创建DispatcherRunner
            // 接着Dispatcher选主的时候,DefaultDispatcherRunner.grantLeadership() 启动新 dispatcher leader即startNewDispatcherLeaderProcess(),DispatcherLeaderProcess.start()会利用JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap,最终调用ApplicationDispatcherBootstrap.runApplicationAsync()执行用户作业的main函数
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);
            //  省略...
        }
    }

当 Dispatcher 选择主节点的时候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 调用作业的 main函数

说明:Dispatcher 选主是利用了 Kubernetes Client 的 LeaderElector,通过 KubernetesLeaderElector 封装 LeaderElector,最终利用 LeaderElectionEventHandler 处理选主的回调任务,其样例如下所示。

public class LeaderElectionExample {
    public static void main(String[] args) throws Exception {
        ApiClient client = Config.defaultClient();
        Configuration.setDefaultApiClient(client);
        String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
        // 创建 ConfigMap 锁
        ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
        // Leader 选举的配置
        LeaderElectionConfig leaderElectionConfig =
                new LeaderElectionConfig(lock,
                        Duration.ofMillis(10000),
                        Duration.ofMillis(8000),
                        Duration.ofMillis(2000));

        // 初始化 LeaderElector
        LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
        // 选举 Leader
        leaderElector.run(
                () -> {
                    System.out.println("Do something when getting leadership.");
                },
                () -> {
                    System.out.println("Do something when losing leadership.");
                });
    }
}

2.3.4 ApplicationDispatcherBootstrap 启动作业

Dispatcher 通过 ApplicationDispatcherBootstrap 利用异步线程和反射机制,执行作业的 mian 函数,并且使用轮训的方式不断查询作业的状态,执行步骤如下:

步骤 1:通过 ThreadLocal 控制 Context 对象,在外部创建好 applicationJobIds 的引用列表并且层层传入,然后利用反射执行用户 main 函数;

步骤 2:在 main 函数中通过执行 execute 或 executeAysnc 生成流图并提交作业,接着把作业 ID 保存到 submitJobIdsapplicationJobIds,因此 ApplicationDispatcherBootstrap 可以获取提交的 jobId

步骤 3:循环每个作业 ID 查询其状态是否为结束状态。如果没有结束,则一直轮训状态;如果全部结束,则退出并关闭集群。

image.png

2.3.5 申请资源启动 TaskManager

说明KubernetesResourceManagerDriver.requestResource 通过 Kubernetes 申请资源启动 TaskManager。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容