关于gobblin应用-mapreduce+azkaban

一。 gobblin部署【mapreduce模式】

1.源码编译:下载源码链接

a。解压文件 tar -zvxf incubator-gobblin-release-0.14.0.tar.gz

b。切换到解压文件夹根目录【gradle安装自行参考网上】需要保持指定的hadoop版本和线上集群一致(最起码是兼容的)

 ./gradlew build -PhadoopVersion=2.6.0-cdh5.14.2 -Pversion=0.14.0 -PuseHadoop2 -x test

注:若是编译过程下载jar过慢,修改下build.gradle指定从本地maven仓库或阿里云gradle库

2.安装部署

a。将编译后的gz包上传到对应的服务器

   tar -zvxf incubator-gobblin-release-0.14.0.tar.gz

b。配置【此处部署采用mapreduce模式】

   export GOBBLIN_JOB_CONFIG_DIR=/usr/local/gobblin/configdir  #用于指定自身调度job配置文件地址 可以不设定,通过命令行参数:--conf来指定
   export GOBBLIN_WORK_DIR=/user/adanalysis/test/gobblin_workdir #用于指定gobblin执行过程涉及到state/stage/output等设置 在非standalone模式下 该地址都是hdfs上存在的地址;通过--workdir来设置
  export HADOOP_BIN_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/bin #用于指定该机器上hadoop bin目录
  • bin/gobblin-mapreduce.sh设置【通常设置的不多】

    一般来需要调整的是libjars 参数的内容,指定第三方依赖的jar包

3.测试:模拟hadoop自身的Distcp的任务

配置文件如下:

#job.template = "resource:///hdfs2hdfs.template"
# setting path
gobblin.flow.edge.input.dataset.descriptor.path=/user/dalan/tests/test_cal_logs
gobblin.flow.edge.output.dataset.descriptor.path=/user/dalan/tests/test_cal_logs_tmp

# settings hdfs
source.data.node.fs.uri=hdfs://localhost:9000 #使用时改成自己的hdfs集群namenode地址
destination.data.node.fs.uri=hdfs://localhost:9000

user.to.proxy=adanalysis #涉及到用户授权的时候 来设定指定job的用户
# gobblin settings
specExecInstance.job.type=hadoopJava   
specExecInstance.job.launcher.type=MAPREDUCE
specExecInstance.job.launcher.class=org.apache.gobblin.runtime.mapreduce.MRJobLauncher

# start
# ====================================================================
# Job configurations
# ====================================================================
job.name=Distcp

# Source and destination paths to be obtained from flow config.
from=${gobblin.flow.edge.input.dataset.descriptor.path}
to=${gobblin.flow.edge.output.dataset.descriptor.path}

#Will delete files in target if not exist in source
#gobblin.copy.recursive.update=true
#gobblin.copy.recursive.delete=true
#gobblin.copy.recursive.deleteEmptyDirectories=true
#gobblin.trash.skip.trash=true

#Will make the job fail if there's any failure
#gobblin.copy.abortOnSingleDatasetFailure=true

#gobblin.copy.preserved.attributes=p

#Job properties to be resolved from source and dest data node config.
fs.uri=${source.data.node.fs.uri}
source.filebased.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}/user/dalan/tests
target.filebased.fs.uri=${destination.data.node.fs.uri}
writer.fs.uri=${target.filebased.fs.uri}

work.dir=/user/dalan/tests/gobblin_workdir #保持是在hdfs上的地址 通过参数--workdir来指定也可以

# ====================================================================
# Distcp configurations
# ====================================================================
extract.namespace=gobblin.copy

gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder

# target location for copy
data.publisher.final.dir=${to}
gobblin.dataset.pattern=${from}

data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher
source.class=org.apache.gobblin.data.management.copy.CopySource
writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder
converter.classes=org.apache.gobblin.converter.IdentityConverter

# =======================================
# Job Parameters to be resolved using SpecExecutor properties
# =======================================
type=${specExecInstance.job.type}

job.jars=/usr/local/gobblin/lib/*
job.lock.enabled=false # 需要zookeeper的支持 最好设置成false;若是配置gobblin-yarn时需要指定为true 指定zookeeper集群地址
job.class=${specExecInstance.job.launcher.class} 
# Gobblin Hadoop Parameters
#launcher.type=${specExecInstance.job.launcher.type} #想让mapreduce在hadoop集群的yarn执行 最好不要设置该参数

4.测试

bin/gobblin-mapreduce.sh --conf configdir/gobblin-distcp.conf --workdir /user/dalan/tests/gobblin_workdir

输出结果:logs/gobblin-current.log

二。gobblin与Azkaban整合

1.azkaban配置内容调整:目录/data/azkaban/exec-server/plugins/jobtypes

a。common.properties 添加:

gobblin.home=/usr/local/gobblin

b。commonprivate.properties 添加:

gobblin.home=/usr/local/gobblin

c. gobblin/private.properties 添加:

  jobtype.classpath=${gobblin.home}/lib/*
  job.hdfs.jars=/resources/gobblin #指定job上传jar到hdfs指定路径
  jobtype.class=azkaban.jobtype.connectors.gobblin.GobblinHadoopJob

d.其他设置项:【在job文件中设置】

 classpath=/usr/local/gobblin/lib/*  #gobblin的jar路径 用于上传到hdfs上
 gobblin.work_dir=/user/adanalysis/test/gobblin_workdir   #指定gobblin的workdir 在hdfs上地址
 job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher #Azkaban调度类
 source.timezone=Asia/Shanghai #时区

2.测试【采用在指定机器上执行:useExecutor=3,用户需要管理员】

# jar library
# 设置gobblin jar library
framework.jars=/usr/local/gobblin/lib/*
#job.jars=/usr/local/gobblin/lib/*
#classpath=/usr/local/gobblin/lib/*
# jam args
Xmx=3G
Xms=3G
# jvm设置
jvm.args=-Xmn1g -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+UnlockExperimentalVMOptions -XX:+G1SummarizeConcMark -XX:MaxGCPauseMillis=100 -XX:-ResizePLAB -XX:+ParallelRefProcEnabled -XX:+AlwaysPreTouch -XX:ParallelGCThreads=24 -XX:ConcGCThreads=16 -XX:G1HeapWastePercent=3 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1MixedGCLiveThresholdPercent=85 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps
[user.to](http://user.to).proxy=adanalysis
# gobblin framework

#设置gobblin work dir【由于采用的mr,该地址必须是hdfs上可读写地址】
gobblin.work_dir=/user/adanalysis/test/gobblin_workdir

# job args
type=gobblin
[job.name](http://job.name)=mysqlToHive_mysqlToHiveExtract
job.group=demos
job.description="a simple mysql2hive demo"

# 指定mr地址
mr.job.root.dir=${gobblin.work_dir}

# 指定launcher方式

job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher

# 取消lock 需要zookeeper支持
job.lock.enabled=false

#job.commit.parallelize=true
#job.commit.parallelCommits=20

# 指定hdfs namenode地址

fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000

#local.work_dir=/usr/local/gobblin
# goblin path args
writer.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
writer.staging.dir=${gobblin.work_dir}/staging
writer.output.dir=${gobblin.work_dir}/output

state.store.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
state.store.dir=${gobblin.work_dir}/state

metrics.log.dir=${gobblin.work_dir}/metrics

# source args
source.class=org.apache.gobblin.source.extractor.extract.jdbc.MysqlSource
source.timezone=Asia/Shanghai
# Source properties
source.entity=wk_ad_income_daily_report
source.querybased.schema=adxbe
source.max.number.of.partitions=20

# Source connection properties
source.conn.driver=com.mysql.jdbc.Driver
source.conn.host=127.0.0.1
source.conn.username=user
source.conn.password=password
source.conn.port=3306
source.conn.timeout=500000

#指定源数据开始位置:watermark

source.querybased.watermark.type=timestamp
source.querybased.start.value=1541411045000

# Extract properties
extract.namespace=${source.querybased.schema}
extract.table.type=snapshot_only
# Property to consider the extract as full dump
# 全量抽取
extract.is.full=true
# Converter properties - Record from mysql source will be processed by the below series of converters
converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter

# date columns format
converter.avro.timestamp.format=yyyy-MM-dd HH:mm:ss'.0'
converter.avro.date.format=yyyy-MM-dd
converter.avro.time.format=HH:mm:ss

# Qualitychecker properties
qualitychecker.task.policies=org.apache.gobblin.policies.count.RowCountPolicy,org.apache.gobblin.policies.schema.SchemaCompatibilityPolicy
qualitychecker.task.policy.types=OPTIONAL,OPTIONAL
qualitychecker.row.policies=org.apache.gobblin.policies.schema.SchemaRowCheckPolicy
qualitychecker.row.policy.types=OPTIONAL
qualitychecker.row.err.file=${gobblin.work_dir}/rowerr

# writer
writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
writer.destination.type=HDFS
#writer.file.path=output
writer.output.format=AVRO

# Publisher properties
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${gobblin.work_dir}/final
data.publisher.replace.final.dir=true

三。其他

  1. gobblin配置参数
  2. gobblin on yarn
  3. gobblin架构组件
  4. gobblin源码
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容