介绍以下Transformations算子:
aggregateByKey
join
cogroup
cartesian
pipe
repartitionAndSortWithinPartitions
glom
randomSplit
zip
zipWithIndex
zipWithUniqueId
(1) aggregateByKey,原理还没有搞清楚,只演示结果
object aggregateByKeyTest {
def seq(a:Int, b:Int) : Int ={
math.max(a, b)
}
def comb(a:Int, b:Int) : Int ={
a + b
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("aggregateByKeyTest").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List((1,3),(1,2),(1,4),(2,3)))
data.aggregateByKey(1)(seq, comb).foreach(println)
/*
(1,4)
(2,3)
*/
val data2 = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(2,4),(3,1),(3,2),(3,3),(4,1),(4,2),(4,3),(4,4)), 2)
data2.aggregateByKey(1)(seq, comb).foreach(println)
/*
(4,4)
(2,4)
(1,4)
(3,4)
*/
}
}
(2) join
object JoinTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MapTest").setMaster("local")
val sc = new SparkContext(conf)
val nameList = List(
(1,"Jed"),
(2,"Tom"),
(3,"Bob"),
(4,"Tony")
)
val salaryArr = Array(
(1,8000),
(2,6000),
(3,5000)
)
val nameRDD = sc.parallelize(nameList,2)
val salaryRDD = sc.parallelize(salaryArr,3)
// inner join
val joinRDD = nameRDD.join(salaryRDD)
joinRDD.foreach( x => {
val id = x._1
val name = x._2._1
val salary = x._2._2
println(id + "\t" + name + "\t" + salary)
})
/*
1 Jed 8000
2 Tom 6000
3 Bob 5000
*/
// left join
val leftOuterJoinRDD = nameRDD.leftOuterJoin(salaryRDD)
leftOuterJoinRDD.foreach( x => {
val id = x._1
val name = x._2._1
val salary = x._2._2
println(id + "\t" + name + "\t" + salary)
})
/*
1 Jed Some(8000)
2 Tom Some(6000)
3 Bob Some(5000)
4 Tony None
*/
// right join
val rightOuterJoinRDD = nameRDD.rightOuterJoin(salaryRDD)
rightOuterJoinRDD.foreach( x => {
val id = x._1
val name = x._2._1
val salary = x._2._2
println(id + "\t" + name + "\t" + salary)
})
/*
1 Some(Jed) 8000
2 Some(Tom) 6000
3 Some(Bob) 5000
*/
// full join
val fullOuterJoinRDD = nameRDD.fullOuterJoin(salaryRDD)
fullOuterJoinRDD.foreach( x => {
val id = x._1
val name = x._2._1
val salary = x._2._2
println(id + "\t" + name + "\t" + salary)
})
/*
1 Some(Jed) Some(8000)
2 Some(Tom) Some(6000)
3 Some(Bob) Some(5000)
4 Some(Tony) None
*/
}
}
(3) cogroup:将多个RDD中同一个Key对应的Value组合到一起
val data1 = sc.parallelize(List((1, "Good"), (2, "Morning")))
val data2 = sc.parallelize(List((1, "How"), (2, "Are"), (3, "You")))
val data3 = sc.parallelize(List((1, "I"), (2, "Love"), (3, "U")))
val result = data1.cogroup(data2, data3)
result.foreach(println)
val data1 = sc.parallelize(List((1, "Good"), (2, "Morning")))
val data2 = sc.parallelize(List((1, "How"), (2, "Are"), (3, "You")))
val data3 = sc.parallelize(List((1, "I"), (2, "Love"), (3, "U")))
val result = data1.cogroup(data2, data3)
result.foreach(println)
/*
(1,(CompactBuffer(Good),CompactBuffer(How),CompactBuffer(I)))
(2,(CompactBuffer(Morning),CompactBuffer(Are),CompactBuffer(Love)))
(3,(CompactBuffer(),CompactBuffer(You),CompactBuffer(U)))
*/
(4) cartesian:求笛卡尔积
val rdd1 = sc.makeRDD(Array(1,2,3))
val rdd2 = sc.makeRDD(Array(4,5,6))
rdd1.cartesian(rdd2).foreach(println)
/*
(1,4)
(1,5)
(1,6)
(2,4)
(2,5)
(2,6)
(3,4)
(3,5)
(3,6)
*/
(5) pipe:调用Shell命令
(6) repartitionAndSortWithinPartitions:重新分区并按照新分区排序
val arr = Array((1,"Tom"),(18,"Tony"),(23,"Ted"),
(3,"Harry"),(56,"Bob"),(45,"Jack"),
(22,"Jed"),(2,"Kobe"),(4,"Kate"),
(23,"Mary"),(32,"Tracy"),(6,"Allen"),
(7,"Caleb"),(19,"Alexande"),(9,"Nathan"))
val rdd = sc.makeRDD(arr,2)
rdd.foreachPartition(x => {
println("=============")
while(x.hasNext) {
println(x.next())
}
})
/*
=============
(1,Tom)
(18,Tony)
(23,Ted)
(3,Harry)
(56,Bob)
(45,Jack)
(22,Jed)
=============
(2,Kobe)
(4,Kate)
(23,Mary)
(32,Tracy)
(6,Allen)
(7,Caleb)
(19,Alexande)
(9,Nathan)
*/
// 改变为4个分区
rdd.repartitionAndSortWithinPartitions(new HashPartitioner(4))
.foreachPartition(x => {
println("=============")
while(x.hasNext) {
println(x.next())
}
})
/*
=============
(4,Kate)
(32,Tracy)
(56,Bob)
=============
(1,Tom)
(9,Nathan)
(45,Jack)
=============
(2,Kobe)
(6,Allen)
(18,Tony)
(22,Jed)
=============
(3,Harry)
(7,Caleb)
(19,Alexande)
(23,Ted)
(23,Mary)
*/
(7) glom:把分区中的元素封装到数组中
val rdd = sc.parallelize(1 to 10,2)
val glomRDD = rdd.glom()
glomRDD.foreach(x => {
println("============")
x.foreach(println)
})
println("glomRDD中的元素个数为:" + glomRDD.count())
/*
============
1
2
3
4
5
============
6
7
8
9
10
glomRDD中的元素个数为:2
*/
(8) randomSplit:拆分RDD
val rdd = sc.parallelize(1 to 10)
// 把原来的RDD按照1:2:3:4的比例拆分为4个RDD
rdd.randomSplit(Array(0.1,0.2,0.3,0.4)).foreach(x => {println(x.count)})
理论结果:
1
2
3
4
在数据量不大的情况下,实际结果不一定准确
(9) zip、zipWithIndex、zipWithUniqueId
package com.aura.transformations
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: Jed
* Description:
* Date: Create in 2018/1/11
*/
object ZipTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MapTest").setMaster("local")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5)
val arr2 = Array("Tom","Jed","Tony","Terry","Kate")
val rdd1 = sc.makeRDD(arr)
val rdd2 = sc.makeRDD(arr2)
rdd1.zip(rdd2).foreach(println)
/*
(1,Tom)
(2,Jed)
(3,Tony)
(4,Terry)
(5,Kate)
*/
rdd2.zipWithIndex().foreach(println)
/*
(Tom,0)
(Jed,1)
(Tony,2)
(Terry,3)
(Kate,4)
*/
rdd1.zipWithUniqueId().foreach(println)
/*
(1,0)
(2,1)
(3,2)
(4,3)
(5,4)
*/
}
}
原理: