四、Azkaban各种类型的Job编写

一、概述

原生的 Azkaban 支持的plugin类型有以下这些:

  1. command:Linux shell命令行任务
  2. gobblin:通用数据采集工具
  3. hadoopJava:运行hadoopMR任务
  4. java:原生java任务
  5. hive:支持执行hiveSQL
  6. pig:pig脚本任务
  7. spark:spark任务
  8. hdfsToTeradata:把数据从hdfs导入Teradata
  9. teradataToHdfs:把数据从Teradata导入hdfs

其中最简单而且最常用的是command类型,我们在上一篇文章中已经描述了如何编写一个command的job任务。如果使用command类型,效果其实跟在本地执行Linux shell命令一样,这样的话,还不如把shell放到crontable 中运行。所以我们把重点放到Azkaban支持的比较常用的四种类型:java、hadoopJava、hive、spark

二、java类型

1、代码编写:MyJavaJob.java

package com.dataeye.java;

public class MyJavaJob {

    public static void main(String[] args) {
        System.out.println("#################################");
        System.out.println("####  MyJavaJob class exec... ###");
        System.out.println("#################################");
    }

}

2、打包成jar文件:使用maven或者eclipse导出为jar文件

3、编写job文件:java.job

type=javaprocess

classpath=./lib/*,${azkaban.home}/lib/*

java.class=com.dataeye.java.MyJavaJob

4、组成一个完整的运行包
新建一个目录,在该目录下创建一个lib文件夹,把第二步打包的jar文件放到这里,把job文件放到和lib文件夹同一级的目录下,如下所示:


完整的运行包

5、打包成zip文件

把lib目录和job文件打包成zip文件,如下的java.zip:

zip文件

6、提交运行,过程跟之前文章介绍的步骤一样,不再详述,执行结果如下:

执行结果

从输出日志可以看出,代码已经正常执行。

以上是java类型的任务编写和执行的过程。接下来介绍其他任务编写的时候,只会介绍代码的编写和job的编写,其他过程与上面的一致。

三、hadoopJava类型

1、数据准备

以下内容是运行wordcount任务时需要的输入文件input.txt:

1   Ross    male    33  3674
2   Julie   male    42  2019
3   Gloria  female  45  3567
4   Carol   female  36  2813
5   Malcolm male    42  2856
6   Joan    female  22  2235
7   Niki    female  27  3682
8   Betty   female  20  3001
9   Linda   male    21  2511
10  Whitney male    35  3075
11  Lily    male    27  3645
12  Fred    female  39  2202
13  Gary    male    28  3925
14  William female  38  2056
15  Charles male    48  2981
16  Michael male    25  2606
17  Karl    female  32  2260
18  Barbara male    39  2743
19  Elizabeth   female  26  2726
20  Helen   female  47  2457
21  Katharine   male    45  3638
22  Lee female  43  3050
23  Ann male    35  2874
24  Diana   male    37  3929
25  Fiona   female  45  2955
26  Bob female  21  3382
27  John    male    48  3677
28  Thomas  female  22  2784
29  Dean    male    38  2266
30  Paul    female  31  2679

把input.txt文件拷贝到hdfs的 /data/yann/input 目录下

2、代码准备:

package azkaban.jobtype.examples.java;

import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;

public class WordCount extends AbstractHadoopJob
{
  private static final Logger logger = Logger.getLogger(WordCount.class);
  private final String inputPath;
  private final String outputPath;
  private boolean forceOutputOverrite;

  public WordCount(String name, Props props)
  {
    super(name, props);
    this.inputPath = props.getString("input.path");
    this.outputPath = props.getString("output.path");
    this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
  }

  public void run()
    throws Exception
  {
    logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));

    JobConf jobconf = getJobConf();
    jobconf.setJarByClass(WordCount.class);

    jobconf.setOutputKeyClass(Text.class);
    jobconf.setOutputValueClass(IntWritable.class);

    jobconf.setMapperClass(Map.class);
    jobconf.setReducerClass(Reduce.class);

    jobconf.setInputFormat(TextInputFormat.class);
    jobconf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
    FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));

    if (this.forceOutputOverrite)
    {
      FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
      fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
    }

    super.run();
  }

  public static class Map extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private long numRecords = 0L;

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
    {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        this.word.set(tokenizer.nextToken());
        output.collect(this.word, one);
        reporter.incrCounter(Counters.INPUT_WORDS, 1L);
      }

      if (++this.numRecords % 100L == 0L)
        reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
    }

    static enum Counters
    {
      INPUT_WORDS;
    }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable>
  {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException
    {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable)values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
}

3、编写job文件

wordcount.job文件内容如下:

type=hadoopJava

job.extend=false

job.class=azkaban.jobtype.examples.java.WordCount

classpath=./lib/*,${azkaban.home}/lib/*

force.output.overwrite=true

input.path=/data/yann/input

output.path=/data/yann/output

这样hadoopJava类型的任务已经完成,打包提交到Azkaban中执行即可

四、hive类型

1、编写 hive.sql文件

use azkaban;

INSERT OVERWRITE TABLE 
 user_table1 PARTITION (day_p='2017-02-08') 
SELECT appid,uid,country,province,city 
 FROM user_table0 where adType=1;

以上是标准的hive的sql脚本,首先切换到azkaban数据库,然后把user_table0 的数据插入到user_table1 表的指定day_p分区。需要先准备好 user_table0 和 user_table1 表结构和数据。

编写完成后,把文件放入 res 文件夹中。

2、编写hive.job文件

type=hive

user.to.proxy=azkaban

classpath=./lib/*,${azkaban.home}/lib/*

azk.hive.action=execute.query

hive.script=res/hive.sql

关键的参数是 hive.script,该参数指定使用的sql脚本在 res目录下的hive.sql文件

五、spark类型

spark任务有两种运行方式,一种是command类型,另一种是spark类型

首先准备好spark任务的代码

package com.dataeye.template.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage:WordCount <hdfs_file>")
      System.exit(1)
    }

    System.out.println("get first param ==> " + args(0))
    System.out.println("get second param ==> " + args(1))

    /** spark 2.0的方式
      * val spark = SparkSession.builder().appName("WordCount").getOrCreate()
      */
    val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
    val spark = new SQLContext(sc)
    val file = spark.sparkContext.textFile(args(0))
    val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    // 数据collect 到driver端打印
    wordCounts.collect().foreach(println _)
  }
}

然后准备数据,数据就使用前面hadoopJava中的数据即可。

最后打包成jar文件:spark-template-1.0-SNAPSHOT.jar

1、command类型

command类型的配置方式比较简单,spark.job文件如下:

type=command

command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar   hdfs://de-hdfs/data/yann/info.txt   paramtest

2、spark类型

type=spark

master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt  paramtest

以上就是Azkaban支持的几种常用的任务类型。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容