其实很早之前就想对spark做一下自己的阐述,一直也无奈于不能系统的进行以下自己的解释,现在还是想粗略的说一下我自己对spark的一些认识。
spark相对于mapreduce来说,计算模型可以提供更强大的功能,他使用的是迭代模型,我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是像mapreduce一样只有两个阶段。
spark大致分为这三种算子:
1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
在这里,我会将对map、flatMap、glom、union、cartesian(笛卡尔操作)、groupBy、filter、distinct(去重)、subtract这9种算子进行描述。
2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
而对于Key-Value的算子,就简单的解释一下mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin这几类进行我的解释。
3、Action算子,这类算子会触发SparkContext提交作业。
针对action算子,foreach、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、
reduce、fold、aggregate。大致就是这几项了。
一、Value数据类型的Transformation算子
1)map
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//rdd有5个元素,将他们分成3个partition
val b = a.map(_.length)//导入数据使用parallelize方式
val c = a.zip(b)
c .collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
Map是把操作映射到每个values里面去。
上述示意图为:
2)flatMap
val a = sc.parallelize(1 to 10, 5)
//rdd有10个元素,将1到10分成5个partition
a.flatMap(1 to _).collect
res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5,6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4,5, 6, 7, 8, 9, 10)
//每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
flatMap是把一个vlaue变成数组,再打断。
该实例的操作示意图为:
3)subtract
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4,7, 5, 8)
针对这个实例画出示意图:
4)glom
val a = sc.parallelize(1 to 100, 3)
a.glom.collect
res8: Array[Array[Int]] = Array(Array(1, 2, 3,..., 33), Array(34,35,..., 65, 66), Array(67, ..., 100))
针对这个实例画出示意图:
5)union
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)
针对这个实例画出示意图:
6)cartesian(笛卡尔操作)
val x =sc.parallelize(List(1,2,3,4,5))
val y =sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] =Array((1,6), (1,7), (1,8), (1,9),(1,10), (2,6), (2,7), (2,8), (2,9),(2,10), (3,6), (3,7), (3,8), (3,9),(3,10), (4,6), (5,6), (4,7), (5,7),
(4,8), (5,8), (4,9), (4,10), (5,9),(5,10))
针对这个实例画出示意图:
7)groupBy(生成相应的key,相同的放在一起)
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] =Array((even,ArrayBuffer(2, 4, 6,8)), (odd,ArrayBuffer(1, 3, 5, 7,9)))
针对这个实例画出示意图:
8)filter
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res3: Array[Int] = Array(2, 4, 6, 8, 10)
针对这个实例画出示意图:
9)distinct(去重)
val c =sc.parallelize(List("Gnu", "Cat","Rat", "Dog", "Gnu", "Rat"), 2)
rdd有6个元素,将这六个元素分成2个partition
c.distinct.collect//将重复出现的元素使用distinct函数去除再形成数组
res6: Array[String] = Array(Dog,Gnu, Cat, Rat)
针对这个实例画出示意图:
二、Key-Value数据类型的Transformation算子
1)mapValues
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
//RDD有6个元素,分别为dog,lion,cat...,将他们分成2个partition
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
针对这个实例画出示意图:
2)combineByKey
val a =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x,(x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu,rabbit, salmon, bee, bear, wolf)))
该实例的操作示意图为:
3)reduceByKey
val a = sc.parallelize(List("dog", "tiger", "dog", "cat", "dog", "eagle", "cat"), 2)
//rdd有7个元素,将他们分成2个partition
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect //使用reduceByKey(_+_)的方式
res87: Array[(Int, String)] = Array((3,dog), (1,tiger), (2,cat),(1,eagle))
该实例的操作示意图为:
4)partitionBy
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并
5)cogroup
对两个RDD中的KV元素,每个RDD相同key中元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同key的元素进行合并
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(ArrayBuffer(b),ArrayBuffer(c))),(3,(ArrayBuffer(b),ArrayBuffer(c))),(1,(ArrayBuffer(b, b),ArrayBuffer(c, c))))
该实例的操作示意图为:
6)join
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
该实例的操作示意图为:
7)leftOutJoin
将LEFT左边的表名1中的所有记录全部保留,而将右边的表名2中的字段B与表名1.字段A相对应的记录显示出来
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect
res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))
该实例的操作示意图为:
8)rightOutJoin(右外连接)
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))
该实例的操作示意图为:
三、Action算子
1)foreach
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)//导入数据使用parallelize方式
c.foreach(x => println(x + "s are yummy"))//得到一条数据就处理一条数据
该实例的操作示意图为:
2)fold
val a = sc.parallelize(List(1,2,3), 3)
a.fold(0)(_ + _)
res59: Int = 6
针对这个实例画出示意图:
3)aggregate
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
// lets first print out the contents of the RDD with partition labels
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val:3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
z.aggregate(0)(math.max(_, _), _ + _)
res40: Int = 9
针对这个实例画出示意图:
4)collect
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect //通过collect算子将两个partition结合成一个
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
针对这个实例画出示意图:
5)collectAsMap
RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap
res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
针对这个实例画出示意图:
6)reduceByKeyLocally
该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
val a =sc.parallelize(List("dog","cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length,x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] =Array((3,dogcatowlgnuant))
针对这个实例画出示意图:
7)lookup
val a =sc.parallelize(List("dog","tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.lookup(5)
res0: Seq[String] = WrappedArray(tiger, eagle)
针对这个实例画出示意图:
8)count
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4
针对这个实例画出示意图:
9)top
val c = sc.parallelize(Array(6, 9, 4,7, 5, 8), 2)
c.top(2)
res28: Array[Int] = Array(9, 8)
针对这个实例画出示意图:
10)reduce
val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050
针对这个实例画出示意图:
这篇文章也是写了一个礼拜才出来的成果,本身不是很熟悉,需要一个算子一个算子去理解,有些地方也没有很准确,但也是我理解能力范围之内了,网站上不太能查到有关于这方面的知识,所以绝大部分靠自学,理解不对的地方也请多多包涵啦。对于mapreduce和spark的理解也仅限于参考网上的知识加上自己的一些见解。
-------------------------------------------参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html-------------------------------------------
------------------------------------------------------------参考:https://www.cnblogs.com/MOBIN/p/5414490.html#12-----------------------------------------------------------