本地IDEA提交Flink/Spark任务到集群的工具

Flink任务、Spark任务提交到集群,通常需要将可执行Jar上传到集群,手动执行任务提交指令,如果有配套的大数据平台则需要上传Jar,由调度系统进行任务提交。对开发者来说,本地IDEA调试Flink、Spark任务不涉及对象的序列化及反序列化,任务在本地调试通过后,执行在分布式环境下也可能会出错。而将任务提交到集群进行调试还要走那些繁琐的流程太影响效率了。

因此,为方便大数据开发人员进行快速开发调试,开发了从本地IDEA提交Flink/Spark任务到集群的工具类 flink-spark-submiter。任务提交代码稍加改造后也可以和上层调度系统进行集成,替代脚本模式进行任务提交的方式。

  • 支持Flink yarnPerJob、Standalone 、yarnSession模式下的任务提交。

  • 支持Spark任务以Yarn Cluster模式提交到YARN,支持自动上传用户Jar包,依赖的Spark Jars需要提前上传到HDFS。

  • 支持Spark任务提交到K8s Cluster,执行的jar需要包含在镜像中,任务执行时需要传递镜像名称及可执行文件路径。如果需要操作hive表,则需要传递集群所在文件夹,以及HADOOP_USER_NAME,系统进行Hadoop文件的挂载及环境变量的设置。

Flink 多执行模式任务提交

  • 需要填写Flink任务运行时参数配置,任务运行所在的集群配置路径,本地Flink根路径。项目依赖flink1.10版本。
  • 支持以YarnSession、YarnPerjob、Standalone模式进行任务提交,返回ApplicationId。
  • example模块下包含一个FlinkDemo,打包后会转移到项目的examplJars中,可以尝试进行任务提交。
  • 任务提交后,根据ApplicationId获取任务执行使用的jm、tm日志基本信息,包含日志访问URL,日志总字节大小,根据日志基本信息可以做日志滚动展示,防止Yarn日志过大导致日志读取卡死。

任务提交示例:

   // 可执行jar包路径
    String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/flink-kafka-reader/flink-kafka-reader.jar";
    // 任务参数
    String[] execArgs = new String[]{"-jobName", "flink110Submit", "--topic", "mqTest01", "--bootstrapServers", "172.16.8.107:9092"};
    // 任务名称
    String jobName = "Flink perjob submit";
    // flink 文件夹路径
    String flinkConfDir = "/Users/maqi/tmp/flink/flink-1.10.0/conf";
    // flink lib包路径
    String flinkJarPath = "/Users/maqi/tmp/flink/flink-1.10.0/lib";
    //  yarn 文件夹路径
    //        String yarnConfDir = "/Users/maqi/tmp/hadoopconf";
    String yarnConfDir = "/Users/maqi/tmp/hadoopconf";
    //  作业依赖的外部文件,例如:udf jar , keytab
    String[] dependFile = new String[]{"/Users/maqi/tmp/flink/flink-1.10.0/README.txt"};
    // 任务提交队列
    String queue = "root.users.hdfs";
    // flink任务执行模式
    String execMode = "yarnPerjob";
    // yarnsession appid配置
    Properties yarnSessionConfProperties = null;
    // savepoint 及并行度相关
    Properties confProperties = new Properties();
    confProperties.setProperty("parallelism", "1");

  JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
            .setExecArgs(execArgs)
            .setName(jobName)
            .setRunJarPath(runJarPath)
            .setDependFile(dependFile)
            .setFlinkConfDir(flinkConfDir)
            .setYarnConfDir(yarnConfDir)
            .setConfProperties(confProperties)
            .setYarnSessionConfProperties(yarnSessionConfProperties)
            .setFlinkJarPath(flinkJarPath)
            .setQueue(queue)
            .build();


   String applicationId = runFlinkJob(jobParamsInfo, execMode);
   //任务启动后,拉取jm,tm日志相关信息。
   Thread.sleep(20000);
   List<String> logsInfo = new RunningLog().getRollingLogBaseInfo(jobParamsInfo, applicationId);
   logsInfo.forEach(System.out::println);

jobmanager日志格式:

{
   "logs":[
       {
           "name":"jobmanager.err ",
           "totalBytes":"555",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.err/"
       },
       {
           "name":"jobmanager.log ",
           "totalBytes":"31944",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.log/"
       },
       {
           "name":"jobmanager.out ",
           "totalBytes":"0",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.out/"
       }
   ],
   "typeName":"jobmanager"
}

taskmanager日志格式:

{
    "logs":[
        {
            "name":"taskmanager.err ",
            "totalBytes":"560",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.err/"
        },
        {
            "name":"taskmanager.log ",
            "totalBytes":"35937",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.log/"
        },
        {
            "name":"taskmanager.out ",
            "totalBytes":"0",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.out/"
        }
    ],
    "otherInfo":"{"dataPort":36218,"freeSlots":0,"hardware":{"cpuCores":4,"freeMemory":241172480,"managedMemory":308700779,"physicalMemory":8201641984},"id":"container_e27_1593571725037_0170_01_000002","path":"akka.tcp://flink@node03:36791/user/taskmanager_0","slotsNumber":1,"timeSinceLastHeartbeat":1593659561129}",
    "typeName":"taskmanager"
}

Spark on yarn 任务提交

  • 填写用户程序包路径、执行参数、集群配置文件夹、安全认证等相关配置。
  • Spark任务提交使用Yarn cluster模式,使用的Spark Jar需要提前上传到HDFS并作为archive的参数。
  • 针对SparkSQL任务,通过提交examples中的spark-sql-proxy程序包来直接操作hive表。

提交示例:

public static void main(String[] args) throws Exception {
        boolean openKerberos = true;
        String appName = "todd spark submit";
        String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/spark-sql-proxy/spark-sql-proxy.jar";
        String mainClass = "cn.todd.spark.SparksqlProxy";
        String yarnConfDir = "/Users/maqi/tmp/hadoopconf";
        String principal = "hdfs/node1@DTSTACK.COM";
        String keyTab = "/Users/maqi/tmp/hadoopconf/hdfs.keytab";
        String jarHdfsDir = "sparkproxy2";
        String archive = "hdfs://nameservice1/sparkjars/jars";
        String queue = "root.users.hdfs";
        String execArgs = getExampleJobParams();

        Properties confProperties = new Properties();
        confProperties.setProperty("spark.executor.cores","2");

        JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
                .setAppName(appName)
                .setRunJarPath(runJarPath)
                .setMainClass(mainClass)
                .setYarnConfDir(yarnConfDir)
                .setPrincipal(principal)
                .setKeytab(keyTab)
                .setJarHdfsDir(jarHdfsDir)
                .setArchivePath(archive)
                .setQueue(queue)
                .setExecArgs(execArgs)
                .setConfProperties(confProperties)
                .setOpenKerberos(BooleanUtils.toString(openKerberos, "true", "false"))
                .build();

        YarnConfiguration yarnConf = YarnConfLoaderUtil.getYarnConf(yarnConfDir);
        String applicationId = "";

        if (BooleanUtils.toBoolean(openKerberos)) {
            UserGroupInformation.setConfiguration(yarnConf);
            UserGroupInformation userGroupInformation = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
            applicationId = userGroupInformation.doAs((PrivilegedExceptionAction<String>) () -> LauncherMain.run(jobParamsInfo, yarnConf));
        } else {
            LauncherMain.run(jobParamsInfo, yarnConf);
        }

        System.out.println(applicationId);
    }

Spark on k8s 任务提交

  • 基于Spark2.4.4进行开发,通过将spark-sql-proxy.jar包打入镜像来执行Sparksql并操作Hive表,无其他特殊操作。
  • 操作Hive时需要传递hadoopConfDir,程序会自动将.xml文件内容进行挂载,如果非root用户操作Hive,需要设置HADOOP_USER_NAME。
  • 通过读取kubeConfig配置文件进行Kuberclient的创建,而非官方提供的master url方式。
  • 任务提交后立即返回spark-app-selector id,从而进行POD状态获取。
  public static void main(String[] args) throws Exception {
      String appName = "todd spark submit";
      // 镜像内的jar路径
      String runJarPath = "local:///opt/dtstack/spark/spark-sql-proxy.jar";
      String mainClass = "cn.todd.spark.SparksqlProxy";
      String hadoopConfDir = "/Users/maqi/tmp/hadoopconf/";
      String kubeConfig = "/Users/maqi/tmp/conf/k8s.config";
      String imageName = "mqspark:2.4.4";
      String execArgs = getExampleJobParams();

      Properties confProperties = new Properties();
      confProperties.setProperty("spark.executor.instances", "2");
      confProperties.setProperty("spark.kubernetes.namespace", "default");
      confProperties.setProperty("spark.kubernetes.authenticate.driver.serviceAccountName", "spark");
      confProperties.setProperty("spark.kubernetes.container.image.pullPolicy", "IfNotPresent");


      JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
              .setAppName(appName)
              .setRunJarPath(runJarPath)
              .setMainClass(mainClass)
              .setExecArgs(execArgs)
              .setConfProperties(confProperties)
              .setHadoopConfDir(hadoopConfDir)
              .setKubeConfig(kubeConfig)
              .setImageName(imageName)
              .build();

      String id = run(jobParamsInfo);
      System.out.println(id);
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342