Spark学习

Spark学习

一、Spark简介

  • 1.Hadoop中Map-Reduce计算框架的替代品
  • 2.运行在HDFS上,可以与Yarn配合
  • 3.将中间结果保存在内存而不是磁盘中
  • 4.提供了比Map、Reduce更多的高阶函数
  • 5.提供了Scala、Python、Java的API以及Scala和Python的shell

二、Spark内置库

  • 1.Spark Streaming 流式数据
  • 2.SparkSQL JDBC API
  • 3.MLlib 机器学习
  • 4.GraphX 图

三、RDD

  • 弹性分布式数据集
  • transform RDD集合 -> RDD集合 变换函数
  • action RDD集合 -> 单个值 行动操作

四、使用Spark的方式

  • 1.私人机器上安装
    • (1) standalone
    • (2) with Mesos
    • (3) with Yarn
  • 2.使用cloudera等公司的虚拟机镜像
  • 3.DataBricks
  • 4.使用官网提供的脚本在AWS的EC2上构建Spark环境
  • *.这里可以下载python2.7,包含大多数常用的科学计算和数据分析库,330M

五、配置Spark环境

软件 版本
操作系统 Mint-16-64bit
Hadoop 2.6.0
Spark 1.4.0
Scala 2.11.6
模式 Spark on Yarn [Cluster]
  • 1.下载Spark,并解压到目录下
$ tar -xzvf spark-1.4.0.tar.gz
$ sudo chmod 777 -R spark-1.4.0/
$ sudo mv spark-1.4.0/ /usr/
  • 2.添加环境变量
$ sudo vi /etc/profile

#添加以下三行
export HADOOP_CONF_DIR=$HADOP_HOME/etc/hadoop
export SPARK_HOME=/usr/spark-1.4.0/
export PATH="$PATH:$SPARK_HOME"
  • 3.修改配置文件
$ cd /usr/spark-1.4.0/conf

$ sudo vi slaves
#添加worker节点
node

$ sudo cp log4j.properties.template log4j.properties

$ sudo cp spark-defaults.conf.template spark-defaults.conf
$ sudo vi spark-defaults.conf
#添加以下几行
[
spark.yarn.am.waitTime 10
spark.yarn.submit.file.replication 0
spark.yarn.preserve.staging.files false
spark.yarn.scheduler.heartbeat.interval-ms 5000 spark.yarn.max.executor.failures 6
spark.yarn.historyServer.address node:10020
spark.yarn.executor.memoryOverhead 512
spark.yarn.driver.memoryOverhead 512
]


$ sudo cp spark-env.sh.template spark-env.sh
$ sudo vi spark-env.sh
#添加以下几行
[
export SCALA_HOME=/usr/scala-2.11.6
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/

#standalone
SPARK_MASTER_IP=node
SPARK_WORKER_MEMORY=512M

#yarn
export HADOOP_HOME=/usr/hadoop-2.6.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=1
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=256M
SPARK_DRIVER_MEMORY=256M
SPARK_YARN_APP_NAME="Spark 1.4.0"
]
  • 4.启动Spark

确认Hadoop已经在运行

$ cd /usr/spark-1.4.0/sbin
$ ./start-all.sh

运行后执行jps命令,应该出现master和worker两个进程


jps
  • 5.测试

(1) 运行示例程序

$ cd /usr/spark-1.4.0/bin
$ run-example SparkPi

(2) 以Yarn-Client模型运行示例程序

$ cd /usr/spark-1.4.0/bin 

#yarn-cluster模式
spark-submit --class org.apache.spark.examples.JavaSparkPi --master yarn-cluster --driver-memory 256m  --executor-memory 256m --executor-cores 1 ../lib/spark-examples-1.4.0-hadoop2.6.0.jar 10

#standalone模式
spark-submit --class org.apache.spark.examples.SparkPi --master local --driver-memory 128m --executor-memory 128m --executor-cores 1 /usr/spark-1.4.0/lib/spark-examples-1.4.0-hadoop2.6.0.jar 10
运行结果
运行结果

(3) spark-shell测试HDFS和Scala

$ cd /usr/spark-1.4.0/bin
$ spark-shell

# Wordcount for spark
val file=sc.textFile("hdfs://node:8020/tmp/2.txt")
val count=file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_)
count.collect()
count.saveAsTextFile("hdfs://node:8020/output")

(4) 出现的问题

  • 1.spark-shell进程经常死掉

错误信息:
./spark-shell: 行 54: 5564 已杀死 "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"

六、配置和使用Spark开发环境

  • 1.下载Intellij IDEA14
  • 2.下载Scala[插件]for intellij(https://confluence.jetbrains.com/display/SCA/)
  • 3.开发环境下安装Scala
  • 4.打开Intellij,新建Scala工程,对于依赖包比较简单的工程,选择Non-SBT类型;在工程中建立[Scala class]->[Object],添加Scala.jar和Spark-assembly-hadoop-*.jar包依赖
  • 5.确认开发环境所使用的JDK版本与Spark集群相兼容
  • 6.打开工程设置,新建[artifact]->[jar from dependencies],选择artifact使用的类,这里可以不将依赖包包含在jar文件中,前提是在Spark集群中设置依赖包的Classpath
  • 7.build,并将jar包传输到Spark集群中
  • 8.执行
#输入以下命令:standalone模式运行
spark-submit --class Your.Class --master local --driver-memory 128m  --executor-memory 128m --executor-cores 1 /path-to/Your.jar

七、Spark读取二进制文件

使用SparkContext的binaryFiles方法读取二进制文件:
源码位于testSpark/loadBinary/loadBinary.java,输入以下命令

#输入以下命令
spark-submit --class main.Scala.loadBinary.loadBinary --master local --driver-memory 128m  --executor-memory 128m --executor-cores 1 /home/zhy/spark-app/testSpark.jar

八、Spark + Kafka + Stream

软件 版本
Kafka 0.8.2.1-scala-2.11
Zookeeper 3.4.6
  • 1.配置Zookeeper
$ tar -zxvf zookeeper-3.4.6.tar.gz 
$ sudo mv zookeeper-3.4.6/ /usr/
$ cd /usr/zookeeper-3.4.6/conf
$ cp zoo_sample.cfg zoo.cfg
$ cd /usr/zookeeper-3.4.6/bin
$ ./zkServer.sh start
  • 2.配置Kafka
$ tar -zxvf kafka-0.8.2.1.tgz
$ sudo mv kafka-0.8.2.1/ /usr/
$ cd /usr/kafka-0.8.2.1
$ bin/kafka-server-start.sh config/server.properties &
运行Kafka后进程
  • 3.连接Spark与Kafka

编译后执行下面的们命令:

spark-submit  --master local --driver-memory 128m  --executor-memory 128m --executor-cores 1 --jars /home/zhy/spark-lib/zkclient-0.5.jar /home/zhy/spark-app/testSpark.jar

错误信息:

 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMe                                          thodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:143)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308                                          )
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300                                          )
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

错误信息分析:可能是由于开源组件版本的兼容性问题引起的

九、Spark + Streaming

十、启动Hadoop与Spark集群

  • 1.打开虚拟机,并确定虚拟机和主机能够Ping通,主要用来确认虚拟机OS获得了有效的IP地址,其次也保证了主机能够通过SSh登录虚拟机
#1.在虚拟机中输入下面一行命令,以获取虚拟机IP地址
$ ifconfig

#2.在windows中打开Cmd命令行,输入下面一行命令,其中yourIPAddress为ifconfig命令显示的IP地址
$ ping yourIPAddress

#3.确认能够ping通
  • 2.使用如下命令修改虚拟机hosts文件与当前IP地址对应
$ ifconfig
$ sudo vi /etc/hosts
#修改172.20.10.4 node这一行为下面引号内的内容(不含引号),其中yourIPAddress为ifconfig命令显示的与主机能够相互ping通的IP地址:
"yourIPAddress node"
  • 3.使用如下命令启动Hadoop:
$ cd /usr/hadoop-2.*.*/sbin
$ ./start-all.sh
  • 4.使用如下命令启动Spark:
$ cd /usr/spark-1.4.0/sbin
$ ./start-all.sh
  • 5.使用jps命令确认HadoopSpark启动成功:

    jps

  • 6.使用Spark示例确认Spark集群能够工作

#运行示例程序
$ cd /usr/spark-1.4.0/bin
$ run-example SparkPi

#通过spark-submit运行示例程序
$ cd /usr/spark-1.4.0/bin
$ spark-submit --class org.apache.spark.examples.SparkPi --master local --driver-memory 128m --executor-memory 128m --executor-cores 1 /usr/spark-1.4.0/lib/spark-examples-1.4.0-hadoop2.*.*.jar 10
  • 7.提交jar命令格式
$ cd /usr/spark-1.4.0/bin
# --class  指定运行的类 
# --master 指定运行方式
# --driver-memory 指定为该task分配的driver内存
# --executor-memory 指定为该task分配的executor内存
# --executor-cores 指定为该task分配的executor运行核数
# ***.jar 最后一个参数是jar包的位置,之后的参数都作为task的参数传入
# arg0 arg1 可选 task的参数

$ spark-submit --class YourClass --master local --driver-memory 128m --executor-memory 128m --executor-cores 1 ***.jar arg0 arg1

十一、Client 远程执行Spark任务

对于Windows开发环境,远程执行Spark任务需要以下步骤:

  • 1.下载SSH客户端

下载一个Windows的SSH客户端,这里选择的是MobaXterm,其便携版下载地址如下:

[下载地址] -> http://mobaxterm.mobatek.net/MobaXterm_v7.7.zip

下载后解压即可使用,界面是这样的:

mobaxterm
mobaxterm
  • 2.连接Spark集群的Master节点

(1) 在MobaXterm软件中点击左边的Session侧边栏,在"Saved Session"文字上点击右键,在弹出菜单中点击"New Session",进入如下界面:

mobaxterm2
mobaxterm2

(2) 点击SSH,在Remote Host中填入节点的IP地址,勾选"specify usernam"并填入用户名,点击"OK"即可。

mobaxterm3
mobaxterm3

此时会自动尝试SSH连接,输入密码即可连接成功。下图左边为MobaXterm自带的可视化ftp工具,右边为SSH命令行:

mobaxterm4
mobaxterm4
  • 3.上传任务所需Jar包

进入ftp中希望上传的文件夹,点击下图红框中的按钮即可选择要上传的文件并上传:

mobaxterm5
mobaxterm5
  • 4.执行Jar任务

在右边命令行中以类似如下命令格式执行Spark任务:

$ spark-submit --class YourClass --master local --driver-memory 128m --executor-memory 128m --executor-cores 1 ***.jar arg0 arg1
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容