一、RDD的创建
1、由一个已经存在的Scala集合创建
2、由外部存储系统的文件创建
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。
3、已有的RDD经过算子转换生成新的RDD
三、RDD编程API
1.RDD 的算子分类
- Transformation(转换):根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:一个rdd进行map操作后生了一个新的rdd。
- Action(动作):对rdd 结果计算后返回一个数值value 给驱动程序;例如:collect 算子将数据集的所有元素收集完成返回给驱动程序。
2.Transformation
RDD 中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark 更加有效率地运行。
转换 | 含义 |
---|---|
map(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 |
filter(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 |
flatMap(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 (Int, Interator[T]) => Iterator[U] |
union(otherDataset) | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD |
intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
distinct([numTasks])) | 对源 RDD 进行去重后返回一个新的 RDD |
groupByKey([numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD |
sortBy(func,[ascending], [numTasks]) | 与 sortByKey 类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD |
cogroup(otherDataset, [numTasks]) | 在 类 型 为 (K,V) 和 (K,W) 的 RDD 上 调 用 , 返 回 一 个 (K,(Iterable<V>,Iterable<W>))类型的 RDD |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。 |
repartition(numPartitions) | 重新给 RDD 分区 |
repartitionAndSortWithinPartitions(part itioner) | 重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
3.Action
动作 | 含义 |
---|---|
reduce(func) | reduce 将 RDD 中元素前两个传给输入函数,产生一个新的 return 值, 新产生的 return 值与 RDD 中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回 RDD 的元素个数 |
first() | 返回 RDD 的第一个元素(类似于 take(1)) |
take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
四、RDD 常用的算子操作
启动 spark-shell进行测试:
spark-shell--master spark://node1:7077
练习1:map、filter
map:对每一个元素进行操作
filter:对每一个元素进行过滤
//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1 里的每一个元素乘 2 然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于 5 的元素
val rdd3 = rdd2.filter(_ >= 5)
//将元素以数组的方式在客户端显示
rdd3.collect
练习 2:flatMap
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1 里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(" "))
rdd2.collect
练习 3:交集、并集
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
练习 4:join、groupByKey
join:将集合中相同的键提取出来,对应的值组成一个元组
join:将集合中相同的键提取出来,对应的值封装到compactBuffer中
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1),("shuke", 2)))
//求 join
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
rdd4.collect
//按key 进行分组
val rdd5=rdd4.groupByKey
rdd5.collect
练习 5:cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup 与groupByKey的区别
rdd3.collect
练习 6:reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce 聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
练习 7:reduceByKey、sortByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2),("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3),("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2)
//按key 进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按 value 的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t=> (t._2, t._1))
rdd5.collect
练习 8:repartition、coalesce
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition 改变rdd1 分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce 改变rdd1 分区数
//减少分区
rdd1.coalesce(2).partitions.size
注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少 rdd分区数,增加rdd分区数不会生效。