java提交flink任务到yarn及其系列操作小记

           小帅我找了许久相关博客资料,苦恼于没有一个完整的系列文案,经过小半个月的盲人摸象,总结了些通过apache官方api将flink任务提交到yarn以及standalone上运行,停止任务触发checkpoin并生成对应的savepoint,实现任务续跑等等一系列操作,并记录其中碰到的问题以及对应的解决方案等等。希望这篇小记能给大家带来作用,也希望大家能多多提些宝贵意见。

1、通过java api提交flink任务到yarn

首先一共三种提交模式,我们在这里选择的是application模式提交(至于为什么选择这种模式,给大家推荐一篇博客:Flink 部署模式,session 、pre job、aplication三种主要模式_xuye0606的博客-CSDN博客)。

选定模式后,开始撸代码,废话不多说,上demo:

/**

    * 提交flink任务demo

    *

    * @return

    */

    public String submitDemo() {

        System.setProperty("HADOOP_USER_NAME", "root");   

        System.out.println("==================== Thread.currentThread().getContextClassLoader().getResource:" + Thread.currentThread().getContextClassLoader().getResource(""));

        String configurationDirectory =  Thread.currentThread().getContextClassLoader().getResource("conf").getPath();

        System.out.println("====================configurationDirectory:" + configurationDirectory);

        //你自己的依赖包地址

        String flinkLibs = "hdfs://nnCluster/data/flink/libs";

        //你需要提交的flink任务包

        String userJarPath = "hdfs://nnCluster/data/flink/jars/flink-sql-submit-sdk.jar";

        //将flink提到yarn上所需的依赖包记住要放到libs目录下

        String flinkDistJar = "hdfs://nnCluster/data/flink/libs/flink-yarn_2.11-1.12.0.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();

        String yarnHaEnabled = "true";

        YarnConfiguration yarnConfiguration = new YarnConfiguration();

        yarnConfiguration.set("yarn.resourcemanager.ha.enabled", yarnHaEnabled); //是否激活高可用

        yarnConfiguration.set("yarn.resourcemanager.cluster-id", "yarn-cluster"); // 集群ID,使用自己服务器上配置的集群ID

        if (Boolean.valueOf(yarnHaEnabled)) {

            //高可用需要再配置RMID

            yarnConfiguration.set("yarn.resourcemanager.address.rm1", "10.255.157.235:8050");

            yarnConfiguration.set("yarn.resourcemanager.address.rm2", "10.255.157.233:8050");

            yarnConfiguration.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");

        }

        // ----------  hdfs 相关配置 -------------//

        String hdfsNameService = "nnCluster";

        yarnConfiguration.set("dfs.nameservices", hdfsNameService);

        yarnConfiguration.set("fs.defaultFS", hdfsNameService);

        // hadoop高可用节点配置

        String hdfsHaEnabled = "true"; //默认高可用

        if (Boolean.valueOf(hdfsHaEnabled)) {

            // nn1, nn2 都是name节点

            yarnConfiguration.set("dfs.namenode.rpc-address." + hdfsNameService + ".nn1", "10.255.157.230:8020");

            yarnConfiguration.set("dfs.namenode.rpc-address." + hdfsNameService + ".nn2", "10.255.157.234:8020");

            yarnConfiguration.set("dfs.ha.namenodes." + hdfsNameService, "nn1,nn2");

        }

        yarnConfiguration.set("dfs.client.failover.proxy.provider." + hdfsNameService, "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        yarnClient.init(yarnConfiguration);

        yarnClient.start();

        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);

        //获取flink的配置

        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);

        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

        flinkConfiguration.set(PipelineOptions.JARS, Collections.singletonList(userJarPath));

        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

        flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(flinkLibs));

        flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar);

        //设置为application模式

        flinkConfiguration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());

        //yarn application name

        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "你自己的任务名");

        //设置配置,可以设置很多

        flinkConfiguration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "32位字母加数字组合flink的jobid");

        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));

        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));

        flinkConfiguration.set(ConfigOptions.key("env.java.opts").stringType().noDefaultValue(), "-Dflink_job_name=ss_test");

        String hadoopconf = Thread.currentThread().getContextClassLoader().getResource("flink").getPath();

        flinkConfiguration.set(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), hadoopconf);

        flinkConfiguration.set(CheckpointingOptions.STATE_BACKEND, "filesystem");

        flinkConfiguration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 20);

        flinkConfiguration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "出发checkpoin生成记录文件的存放地址");

        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();

//    设置用户jar的参数和主类

        ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[]{submitVO.toJSONString()}, "com.xxxx.bigdataclient.sqlsubmit.SqlSubmit");

        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true);

        ClusterClientProvider<ApplicationId> clusterClientProvider;

        try {

            clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification, appConfig);

        } catch (ClusterDeploymentException e) {

            e.printStackTrace();

            throw new RuntimeException(e);

        }

        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

        ApplicationId applicationId = clusterClient.getClusterId();

        System.out.println(applicationId);

        return null;

    }

提交任务中碰到的问题及其解决方法

1、无法灵活的去加载配置文件

由于"fs.hdfs.hadoopconf"参数key在org.apache.flink.configuration包中未定义,需要自己定义指定

flinkConfiguration.set(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),"core-site.xml和hdfs-site.xml所在位置");

2、上传到hdfs上的文件当前任务没有使用权限

hadoop fs -chown -R root:hdfs 你的路径

3、任务运行后如果存在部分源码包二开需要做相应的调整

我这边对flink的jdbc连接这块做了二开,故需要将对应jar包整合

4、任务提交成功后运行失败

java.lang.NoSuchMethodException: org.apache.hadoop.yarn.api.records.Resource.setResourceInformation(java.lang.String, org.apache.hadoop.yarn.api.records.ResourceInformation)

这是因为引用flink提供的原生的

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar包中对应的类中没有setResourceInformation方法

如下:

样例

此处需要将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar包替换成hadoop自己的hadoop-yarn-api-3.1.1.jar包来获取该方法:


代码样例

2、通过java api停止yarn上的flink任务并将savepoint存到对应的指定目录下

      关于停止操作里面或多或少有一些小细节需要记录,先上代码:

   //前面配置对应相关参数略,可参考提交任务

       YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);

        //获取flink的配置

        Configuration flinkConfiguration = new Configuration();

        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, "当前需要停止的任务在yarn上的applicationId");

        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true);

        ClusterClientProvider<ApplicationId> clusterClientProvider;

        YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();

        ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);

        clusterClientProvider = yarnClusterDescriptor.retrieve(applicationId);

        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

        Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();

       //注意此处的jobId是yarn分配的flink运行空间中flink的jobId

        final JobID jobIds;

        jobIds = JobID.fromHexString("需要停止的jobId");

        CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(jobIds, true, "之前设置的savepoint目录");

        String savepoint = completableFuture.get();

        System.out.println(savepoint);

停止yarn上flink任务相关小问题

1、 在停止yarn上的相关flink任务之前需要将对应的checlpoint机制打开,否则后续无法续跑。

2、注意JOBID可以使用轮询方法去获取停止,但是个人建议还是记录自己的JobId去定向停止

3、续跑停止任务

yarn上的flink任务的续跑不同于在standalone上运行的flink任务续跑,只要将对应的savepoint获取即可

//其他同提交任务

flinkConfiguration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "你的savepoint的全路径");

续跑yarn上flink任务相关小问题:

1、在yarn上续跑一个flink任务的实质是利用停止或者当前任务失败去出发chekpoint所生成的savepoint源文件来重新提交一个新的flink任务

2、在yarn上停止后得任务不要再用YarnClusterDescriptor得retrieve方法去获取对应任务连接来操作JobGraph去使用triggerSavepoint来进行续跑(此方法适用于standalone模式)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,527评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,314评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,535评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,006评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,961评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,220评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,664评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,351评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,481评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,397评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,443评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,123评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,713评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,801评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,010评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,494评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,075评论 2 341

推荐阅读更多精彩内容