修改hdp的yarn配置
- 登陆到Ambari找到'Ambari -> YARN -> configs'的'Advanced'页面
- 找到页面底部的 'add custom property for yarn-site'
- 点击 'add property' 并添加 'hdp.version' 和版本号
- 保存配置并重启相关的服务
NOTE: 如果不配置会出现
"bad substitution" error running Spark on Yarn
。[ref]
环境变量设置
在集群的环境变量需要添加如下配置
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HBASE_CONF_DIR=/etc/hbase/conf
export CLASSPATH=$HADOOP_CONF_DIR:$HBASE_CONF_DIR
添加相应的jar到$JANUSGRAPH_HOME/lib
- 添加hdp的
spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar
- 删除之前lib下的spark相关的jar
- 把lib文件夹分发到集群的其他机器
NOTE: 上面添加的spark-assembly需要和hdp的版本统一。要不然可能会导致无法访问hdfs上的数据。
配置$JANUSGRAPH_HOME/conf/hadoop-graph/hadoop-load.properties
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
#
# SparkGraphComputer Configuration
#
spark.master=yarn-client
spark.executor.memory=512m
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.app.name=janusgraph-data-load
spark.app.id=janusgraph-data-load
#分发到集群的lib文件的地址,和相关组件配置文件的地址
spark.executor.extraClassPath=/opt/janusgraph-lib/*:/etc/hadoop/conf:/etc/hbase/conf:/etc/spark/conf
#hdp的版本
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.1.0-129
spark.executor.extraJavaOptions=-Dhdp.version=2.6.1.0-129
spark.driver.extraJavaOptions=-Dhdp.version=2.6.1.0-129
NOTE:
read-hadoop.properties
相关的spark的配置和这个文件的相同。
测试
bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
plugin activated: janusgraph.imports
gremlin> :plugin use tinkerpop.hadoop
==>tinkerpop.hadoop activated
gremlin> :plugin use tinkerpop.spark
==>tinkerpop.spark activated
gremlin> :load data/grateful-dead-janusgraph-schema.groovy
==>true
==>true
gremlin> graph = JanusGraphFactory.open('conf/janusgraph-hbase.properties')
==>standardjanusgraph[hbase:[kg-server-96.kg.com, kg-agent-95.kg.com, kg-agent-97.kg.com]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> if (!hdfs.exists('data/grateful-dead.kryo')) hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/janusgraph-hbase.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
...
==>result[hadoopgraph[gryoinputformat->nulloutputformat],memory[size:0]]
gremlin> graph.close()
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/read-hbase.properties')
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.V().count()
...
==>808
NOTE:测试需要切换到spark用户下运行,防止无法访问hdfs的错误。