Spark 概述
Support working sets (of data) through RDD
- Enabling reuse & fault‐tolerance
RDD Resilient Distributed Dataset 弹性分布式数据集
源码显示:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
1)RDD是一个抽象类
2)泛型的,可以支持多种类型: String、Person、User
RDD represents an immutable,partitioned collection of elements that can be operated on in parallel
简单来说, RDD是Spark最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作。
Propeties of RDD
Internally, each RDD is characterized by 5 main properties:
- A list of partitions 一个 RDD 由一个或者多个分区组成,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数
- A function for computing each split/partition 对一个RDD执行一个函数,就是对于一个RDD的所有分区执行这个函数
- A list of dependencies on other RDDs 保存彼此间的依赖关系,子RDD丢失了可以从父RDD中重新得到
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)通过分区器决定数据被存储在哪个分区,目前Spark支 HashPartitioner和 RangeParationer
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 优先把作业调度到数据所在的节点进行计算:移动数据不如移动计算
另外后续会提到关于分区:
one task on per partition --- 对于每一个分区,其都有一个task去处理
one partition could be persisted --- 对于每一个分区,其都可以被持久化
源码体现特性:
def getPartitions: Array[Partition] 特性一
def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getDependencies: Seq[Dependency[_]] = deps 特性三
val partitioner: Option[Partitioner] = None 特性四
def getPreferredLocations(split: Partition): Seq[String] = Nil 特性五
RDD 创建方式
1.Parallelized Collections 已有的集合创建
命令行运行
到 sprak的 bin目录下 pyspark开启shell操作
>>> sc # 命令行运行中系统会自动创建一个SparkContext 名称为sc
<SparkContext master=local[*] appName=PySparkShell>
>>> sc.getConf() # sc对应的SparkConf
<pyspark.conf.SparkConf object at 0x7f26237eeba8>
>>> data = [1,2,3,4,5]
>>> dataRDD = sc.parallelize(data)
>>> dataRDD.collect()
[1, 2, 3, 4, 5]
>>> dataRDD2 = sc.parallelize(data,5) # 把data切成五份 5 个partition
# 运行一个partition有1个task,言下之意对应有5个partition的data,有5个task来执行
>>> dataRDD2.collect()
[1, 2, 3, 4, 5]
WebUI上查看(4040端口)信息:http://hadoop000:4040
2.External Datasets 外部存储资源读取,例如本地文件系统,HDFS,HBase 或支持 Hadoop InputFormat 的任何数据源。
#本地上 读本地文件 输入的参数路径格式: file://xxx
sc.textFile('file:///home/hadoop/data/hello.txt').collect()
如果是本地操作的话,文件必须要能够被这个工作的节点访问得到。
支持目录路径,支持压缩文件,支持使用通配符
textFile(dir) textFile(*.txt) textFile(*.gz)
textFile:其返回格式是 RDD[String] ,返回的是就是文件内容,RDD 中每一个元素对应一行数据
#hadoop集群上读取 输入的参数路径格式:hdfs://xxx
>>> sc.textFile("hdfs://hadoop000:8020/test/hello.txt").collect()
#还可以使用wholeTextFiles进行读取
>>> sc.wholeTextFiles('file:///home/hadoop/data/hello.txt').collect()
[('file:/home/hadoop/data/hello.txt', 'hello spark\nhello pyspark\nhello sparksql\n')]
返回一个list list中每一个元素是一个tuple,第一个是全路径,第二个是内容
wholeTextFiles:其返回格式是 RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容
从HDFS上读取文件时,Spark默认每个块设置为一个分区
RDD 算子
- transformations
由于RDD是不可变的集合,所以需要通过转换从现有数据集创建新数据集
常用操作
1.map(func): 将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回
2.filter(func):选出所有func返回值为true的元素,生成一个新的分布式的数据集返回
3.flatMap(func) 输入的item能够被map到0或者多个items输出,返回值是一个Sequence
4.groupByKey():把相同的key的数据分发到一起 返回的每一个item (key, ResultIterable)
5.reduceByKey(): 把相同的key的数据分发到一起并进行相应的计算
6.union
7.distinct
8.join 默认为innerjoin,可以使用leftjoin rightjoin, fulljoin
一个简单计算wordcount的写法
sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(lambda line: line.split("\t")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
- actions
action操作将数据集上运计算后将得到的值返回到driver program或者写到external storage中去
常见操作
1. collect() 输出 类似于print
2. take(n) 取前几个
3. max() min() sum() count()
4. reduce(func)
5. saveAsTextFile()
6. foreach() 对每一个元素进行操作
- transformations are lazy execution, nothing actually happens until an action is called;
- action triggers the computation;
- action returns values to driver or writes data to external storage;
Spark的使用
from pyspark import SparkConf,SparkContext
# 1.创建SparkConf:设置Spark相关的参数信息
conf = SparkConf().setMaster("local[2]").setAppName("PySpark")
# 2.创建SparkContext
sc = SparkContext(conf=conf)
## 3.业务逻辑 RDD创建和操作
"""
xxxxxxxxx
"""
# 4.清除上下文
sc.stop()
Spark运行模式
- local (开发过程使用)
一般使用的参数有
--master , --name , --py-files
提交作业
./bin/spark-submit --master local[2] -- master url
--name spark-local -- app name
/home/hadoop/script/test.py -- py file
file:///home/hadoop/data/hello.txt -- 参数
file:///home/hadoop/wc/output
使用场景:一般是取小部分数据 通过 spark-submit --master local[*] xx.py [args]
在本地跑一跑看看效果
local模式不需要对spark进行任何配置,但是standalone需要进行相应的配置,standalone模式下有worker和master进程
1.配置slaves
$SPARK_HOME/conf/slaves
hadoop000
hadoop001 ...
2.启动spark集群
$SPARK_HOME/sbin/start-all.sh
ps: 建议在spark-env.sh中添加JAVA_HOME,否则有可能报错
检测: jps 查看master和worker进程是否启动成功
提交作业
./bin/spark-submit --master spark://hadoop000:7077 -- master url
--name spark-standalone -- app name
/home/hadoop/script/test.py -- py file
hdfs://hadoop000:8020/wc.txt -- 参数
hdfs://hadoop000:8020/wc/output
注意:使用standalone模式,而且节点个数大于1的时候,
使用本地文件测试,必须要保证每个节点上都有本地测试文件
spark://hadoop000:7077 表示spark提交作业的端口
WebUI地址端口: 8080
- yarn
yarn 和 standalone的比较:
https://www.runexception.com/q/3546
yarn: 只需要一个节点,然后提交作业即可,不需要spark集群的(不需要启动master和worker),spark 做为一个应用程序(application) 的方式运行在 yarn 上
standalone:spark集群上每个节点都需要部署spark,需要启动spark集群(需要master和worker)
提交作业
./spark-submit --master yarn
--name spark-yarn
/home/hadoop/script/test.py
hdfs://hadoop000:8020/wc.txt
hdfs://hadoop000:8020/wc/output
Exception in thread "main" java.lang.Exception:When running with master
'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment
spark 想要跑在yarn 上必须要知道HDFS 和 yarn 的信息,不然 spark无法找到yarn
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which
contains the (client side) configuration files for the Hadoop cluster.
These configs are used to write to HDFS and connect to the YARN ResourceManager.
The configuration contained in this directory will be distributed to the YARN cluster
so that all containers used by the application use the same configuration.
部署模式 deploy-mode: client cluster: 决定driver启动的位置(driver运行在哪)
client: driver运行在本地,提交作业的进程是不能停止,否则作业就挂了
cluster:提交完作业,那么提交作业端就可以断开了,因为driver是运行在application master中
注意:Cluster deploy mode is not applicable to Spark shells cluster的部署模式不能在交互式的pyspark shell中实现
Spark 架构
Application:基于Spark的应用程序 = 1 driver + executors
Driver:Application的 main() 方法的进程,并创建 SparkContext
Cluster Manager:集群资源管理 例如 spark-submit **--master local[2]** / **spark://hadoop000:7077/yarn** 这些就是集群的管理
Deploy mode:区分driver运行在哪,cluster模式,运行在集群里面,client模式,运行在集群外面(本地)
Worker node:执行计算任务的工作节点(机器)
Executor:位于工作节点上的进程,负责执行计算任务并且将输出数据保存到内存或者磁盘中
Task:Executor 中的工作单元 driver端发起,发送到worker node上的executor上去执行
Job:并行计算,由多个task构成,一个action对应一个job
Stage:一个stage的边界往往是从某个地方取数据开始,到shuffle的结束
每个Application都有自己的一系列executors的进程,这样的话保证了不同的Applications间的隔离,但同时也导致了不同Applications间数据的不可共享性,如果要共享的话,只能写到外部文件,然后再去访问
Spark 缓存
cache transformation一样,采用lazy execution :没有遇到action是不会提交作业到spark上运行
rdd.cache()
底层使用persist方法:
rdd.cache() = rdd.persist(StorageLevel.MEMORY_ONLY)
如果一个RDD在后续的计算中可能会被使用到,那么建议cache
最底层其实调用的是StorageLevel类
Spark automatically monitors cache usage on each node
and drops out old data partitions in a least-recently-used (LRU) fashion.
# 手动删除cache
rdd.unpersist() # unpersist是一个立即执行的操作,不是lazy execution
- 缓存策略的选择:权衡memory usage and CPU efficiency 一句话:memory 但副本->memory_ser 单副本 -> 硬盘 ->多副本
- 能用MEMORY_ONLY搞定就用默认的MEMORY_ONLY(most CPU-efficient option)
- 内存不够的话,尝试使用MEMORY_ONLY_SER并且选择一个比较快的序列化库(省空间,但是反序列化开销CPU时间,且仅仅Java和Scala使用)
Spark Lineage 机制
Lineage: RDD间的依赖关系
窄依赖:一个父RDD的partition至多被子RDD的某个partition使用一次(pipline-able)
-
宽依赖:一个父RDD的partition会被子RDD的partition使用多次,有shuffle操作
有n个stage对应了n+1个shuffle
Spark shuffle
xxx
Spark 优化
内存管理 execution & storage
execution memory refers to that used for computation in shuffles, joins, sorts, and aggregations,
storage memory refers to that used for caching and propagating internal data across the cluster广播变量 (20KB)
每个 Task会有自由变量的副本,如果变量很大且 Task 任务很多的情况下,这必然会对网络 IO 造成压力
广播变量:就是不把副本变量分发到每个 Task 中,而是将其分发到每个 Executor,Executor 中的所有 Task 共享一个副本变量。
sc.broadcast(data)
数据本地化 Data locality:how close data is to the code processing it 数据和代码有多近
it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data
移动数据不如移动计算(代码)
类型:
PROCESS_LOCAL:同一个JVM下
NODE_LOCAL:同一个节点
NO_PREF:所有地方都一样
RACK_LOCAL:同一个机架
ANY:不同机架