Spark源码分析(1) RDD是什么

RDD是Spark的基础,是对大数据的抽象,所以先破解Spark,首先从RDD开始。

  • RDD 是什么?有什么特点?
  • RDD 包含什么?
  • RDD 能做什么?

RDD 的注释

org.apache.spark.rdd.RDD 类源代码中有详细的注释:

  • A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
    翻译:弹性的 分布式 数据集是 Spark 基础的抽象。
    解释:弹性的(可复原的),说明数据集具有容错性、可修复性。
    分布式,说明数据集可以分布在不同的机器上

  • Represents an immutable, partitioned collection of elements that can be operated on in parallel.
    翻译:RDD 是不可变的 分区的 可并行处理的 元素集合
    解释:不可变的,这和 Scala 的设计理念相同,数据集一旦构建完成,就不能再修改,这样能轻松解决多个线程读数据的一致性问题。
    分区的=可并行处理的=分布式

  • This class contains the basic operations available on all RDDs, such as map, filter, and persist.
    翻译:这个抽象类包含了所有 RDD 都应该有的基本操作,比如 mapfilterpersist
    解释:这三个操作分别是:批量转换、筛选、持久化

  • In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
    pairs, such as groupByKey and join;
    翻译:另外 PairRDDFunctions 对象中包含了 键值对型(KV型) RDD 的操作,例如 groupByKeyjoin
    解释:KV 型可以支持按 Key 分组、关联等操作

  • [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
    Doubles;
    翻译:DoubleRDDFunctions提供可 double 数据集的操作;
    解释:数值型数据集有求和、平均、分布图等统计性操作

  • and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
    can be saved as SequenceFiles.
    翻译:SequenceFileRDDFunctions 提供了顺序存储操作

  • All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
    翻译:所有的的类通过隐式转换自动地用于RDD实例中
    解释:RDD 伴生对象里包含了隐式转换函数,用implicit 修饰。隐式转换是 Scala 的语法特性。

  • Internally, each RDD is characterized by five main properties:

    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
      翻译:在 RDD 中,包含这样的5个属性(也就说要实现抽象方法或给空对象赋值):
    • 一个分区的列表(getPartitions)
    • 一个用于计算分区中数据的函数(compute)
    • 一个对其他 RDD 的依赖列表(getDependencies)
    • 可选:KV 型 RDD 应该有一个分区器,例如 hash-分区器(partitioner)
    • 可选:分区数据计算完后优先存储的位置,例如 HDFS 的某个块(getPreferredLocations)
  • All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions.
    翻译: Spark 中所有的任务调度、任务执行都依赖于这些方法。RDD 可以覆盖这些方法,实现有自己的计算方法。例如从一个新的存储系统中读取数据。

  • Please refer to the http://101.96.8.165/people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf for more details on RDD internals.
    翻译:更多细节,可以 Spark 的论文

一段示例代码

是的,我们从HelloWorld开始,官方推荐的第一个程序是计算π的近似值:

import scala.math.random
import org.apache.spark.sql.SparkSession

object SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y <= 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
    spark.stop()
  }
}

😱什么,RDD 在哪里?

  1. 获得 Spark 会话 和 Spark 上下文
    val spark = SparkSession.builder.appName("xx").getOrCreate()
    这4行,只能算是1行代码。调用内部的构造器产生了一个Spark 会话对象,赋值给 spark。后面 spark.sparkContext 是获得 Spark 的上下文对象。至于会话对象和上下文对象,以后再分析。

  2. 创建一个 RDD
    spark.sparkContext.parallelize(1 until n, slices)
    SparkContext 对象中有一个parallelize 函数,创建了一个RDD对象。
    RDD是抽象类。进入源码我们可以看到 创建的 RDD 是 ParallelCollectionRDD(字面翻译为并行容器 RDD)。这个RDD是最简单的RDD了。如果是我,我会将它命名为SimpleRDD。
    这句话创建了一个包含slices个分区的 RDD,RDD 的内容是1到 n,这 n+1 个数。数据存在内存中,从内存读分区的数据。

  3. 看看这个 RDD 中的细节
    还记得前一节翻译的文字吗?RDD 应该实现5个方法。这个并行容器 RDD 是怎么实现的呢?

    • 一个分区的列表
      将数据分成slices份,放在slices个容器中。每个容器就是一个分区,所有容器构成了分区列表
    • 一个用于计算分区中数据的函数
      什么都没做,返回分区的迭代器
    • 一个依赖列表
      依赖列表为Nil空列表。即,这个 RDD 不依赖别的 RDD
    • 一个分区器
      不是 KV 型的,不需要
    • 一个运算存储优先位置
      SparkContext传入了一个 Map,Map 有slices个key,对应slices个容器。可见,SparkContext希望结果存在内存中。
  4. map
    map是将分组中每一个元素映射成另一个元素的操作。我们说过,RDD是不可变的,map这个操作产生新MapPartitionsRDD对象。
    那MapPartitionsRDD的5个方法呢?

    • 一个依赖列表:只依赖于上游的 RDD,本例中依赖于上游的ParallelCollectionRDD。
    • 一个分区列表:就是上游分区列表,直接读取上游数据
    • 一个计算:计算过程就是“映射关系”,由外部传入一个函数对象表达映射关系
    • 一个分区器:上游 RDD 的分区器,直接读上游的分区
    • 一个优先存储位置:上游 RDD 的优先位置,本例中直接写到SparkContext传入的 Map
  5. reduce
    reduce 也是一个操作,是多对一的聚合操作,聚合前后类型必须一致。本例中是求和操作。
    过程可以简述成,先计算每个分区的聚合结果,再将多个分区的结果再聚合。过程比较复杂,以后再深入。

  6. 如何计算π?
    random 取随机数,范围是 [0, 1),那么x 和 y 是 [-1, 1)范围内的随机数。
    计算xx+yy,这是点(x, y)到(0, 0) 的距离,当距离不大1(点落在r=1的圆内)时,取1,否则取0。那么随机取 N 个点,点落圆内的几率等于圆的面积/边长为2的正方形的面积。所以:
    圆的面积 ≌ 正方形面积 * 落在圆内的点数 / 所有的点数
    圆的面积=π,正方形面积=4
    根据大数定理和中心极限定理,取的点越多,π的估值越近似于正态分布;取得的点越多,正态分布的标准差越小;取得的点越多,正态分布的均值越接近π的真值。所以,随着取点的增加,π估值约精确。


Scala 语法

1 until n 用到了三个 Scala 语法:

  1. 一切皆对象
    在 Java 中,1会被认为是一个基本类型int,可以装包成对象,在 Scala 中,1 就是一个对象。
  2. 隐式转换
    util是调用 RichInt.util 方法。Int 转换成 RichInt 是隐式的,定义在 scala.Predef对象中的intWrapper方法。scala.Predef类似于宏。
    参考: scala source implicit conversion from Int to RichInt - Stack Overflow
  3. 函数调用的写法
    1 until n等价于1.until(n),也就是说,如果对象方法若只有一个参数,可以省略掉点和括号,这样代码更接近自然语言。

OK,那么1 until n 这句话写全了应该是什么样的呢?
答:scala.this.Predef.intWrapper(1).until(n);


疑问列表

我将阅读过程中的未解内容记录下来,留待以后阅读代码时解答。疑问一个一个划掉,就是成长的过程。

  1. reduce 等 RDD 操作是如何执行的?

总结

  1. RDD 是数据集
  2. RDD 的特点是有弹性、分布式、不可变。
  3. RDD应该包含5个部分:一个分区集、一个依赖集、一个运算、[一个分区器、一个优先结果存储位置]。
  4. RDD 有一系列的操作,包括映射、过滤、聚合、存储等。

本文源码

RDD spark/core/RDD/RDD.scala at master · apache/spark · GitHub
map spark/core/RDD/MapPartitionsRDD at master · apache/spark · GitHub
计算π spark/examples/SparkPi.scala at master · apache/spark · GitHub


@ Kangying Village, Beijing, China

Spark源码/RDD

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,311评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,339评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,671评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,252评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,253评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,031评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,340评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,973评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,466评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,937评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,039评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,701评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,254评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,259评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,497评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,786评论 2 345

推荐阅读更多精彩内容