spark三类算子小总结

其实很早之前就想对spark做一下自己的阐述,一直也无奈于不能系统的进行以下自己的解释,现在还是想粗略的说一下我自己对spark的一些认识。

spark相对于mapreduce来说,计算模型可以提供更强大的功能,他使用的是迭代模型,我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是像mapreduce一样只有两个阶段。

spark大致分为这三种算子:

 1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

          在这里,我会将对map、flatMap、glom、union、cartesian(笛卡尔操作)、groupBy、filter、distinct(去重)、subtract这9种算子进行描述。

 2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

            而对于Key-Value的算子,就简单的解释一下mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin这几类进行我的解释。

    3、Action算子,这类算子会触发SparkContext提交作业。

            针对action算子,foreach、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、

reduce、fold、aggregate。大致就是这几项了。

一、Value数据类型的Transformation算子

1)map

        val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

        //rdd有5个元素,将他们分成3个partition

        val b = a.map(_.length)//导入数据使用parallelize方式

        val c = a.zip(b)

        c .collect

        res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

        Map是把操作映射到每个values里面去。

       上述示意图为:



2)flatMap

        val a = sc.parallelize(1 to 10, 5)  

        //rdd有10个元素,将1到10分成5个partition

        a.flatMap(1 to _).collect

        res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5,6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3,           4,5, 6, 7, 8, 9, 10)

        //每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出

        sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect

        res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

        flatMap是把一个vlaue变成数组,再打断。

        该实例的操作示意图为:

3)subtract

          val a = sc.parallelize(1 to 9, 3)

          val b = sc.parallelize(1 to 3, 3)

          val c = a.subtract(b)

          c.collect

          res3: Array[Int] = Array(6, 9, 4,7, 5, 8)

          针对这个实例画出示意图:

4)glom

            val a = sc.parallelize(1 to 100, 3)

            a.glom.collect

            res8: Array[Array[Int]] = Array(Array(1, 2, 3,..., 33), Array(34,35,..., 65, 66), Array(67, ..., 100))

        针对这个实例画出示意图:


5)union

         val a = sc.parallelize(1 to 3, 1)

         val b = sc.parallelize(5 to 7, 1)

         (a ++ b).collect

         res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

        针对这个实例画出示意图:

6)cartesian(笛卡尔操作)

          val x =sc.parallelize(List(1,2,3,4,5))

          val y =sc.parallelize(List(6,7,8,9,10))

          x.cartesian(y).collect

          res0: Array[(Int, Int)] =Array((1,6), (1,7), (1,8), (1,9),(1,10), (2,6), (2,7), (2,8), (2,9),(2,10), (3,6), (3,7), (3,8), (3,9),(3,10), (4,6), (5,6), (4,7), (5,7),

(4,8), (5,8), (4,9), (4,10), (5,9),(5,10))

        针对这个实例画出示意图:


7)groupBy(生成相应的key,相同的放在一起)

         val a = sc.parallelize(1 to 9, 3)

         a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect

         res42: Array[(String, Seq[Int])] =Array((even,ArrayBuffer(2, 4, 6,8)), (odd,ArrayBuffer(1, 3, 5, 7,9)))

        针对这个实例画出示意图:


8)filter

         val a = sc.parallelize(1 to 10, 3)

         val b = a.filter(_ % 2 == 0)

         b.collect

         res3: Array[Int] = Array(2, 4, 6, 8, 10)

        针对这个实例画出示意图:

9)distinct(去重)

        val c =sc.parallelize(List("Gnu", "Cat","Rat", "Dog", "Gnu", "Rat"), 2)

        rdd有6个元素,将这六个元素分成2个partition

        c.distinct.collect//将重复出现的元素使用distinct函数去除再形成数组

        res6: Array[String] = Array(Dog,Gnu, Cat, Rat)

        针对这个实例画出示意图:




二、Key-Value数据类型的Transformation算子

 1)mapValues

        val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

       //RDD有6个元素,分别为dog,lion,cat...,将他们分成2个partition

       val b = a.map(x => (x.length, x))

       b.mapValues("x" + _ + "x").collect

       res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

针对这个实例画出示意图:


 2)combineByKey

          val a =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

          val c = b.zip(a)

          val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x,(x:List[String], y:List[String]) => x ::: y)

          d.collect

          res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu,rabbit, salmon, bee, bear, wolf)))

          该实例的操作示意图为:


 3)reduceByKey

          val a = sc.parallelize(List("dog", "tiger", "dog", "cat", "dog", "eagle", "cat"), 2)

          //rdd有7个元素,将他们分成2个partition

          val b = a.map(x => (x.length, x))

          b.reduceByKey(_ + _).collect   //使用reduceByKey(_+_)的方式

          res87: Array[(Int, String)] = Array((3,dog), (1,tiger), (2,cat),(1,eagle))

          该实例的操作示意图为:



 4)partitionBy

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并

 5)cogroup

         对两个RDD中的KV元素,每个RDD相同key中元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同key的元素进行合并

          val a = sc.parallelize(List(1, 2, 1, 3), 1)

          val b = a.map((_, "b"))

          val c = a.map((_, "c"))

          b.cogroup(c).collect

          res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(ArrayBuffer(b),ArrayBuffer(c))),(3,(ArrayBuffer(b),ArrayBuffer(c))),(1,(ArrayBuffer(b, b),ArrayBuffer(c, c))))

          该实例的操作示意图为:



  6)join

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.join(d).collect

          res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

          该实例的操作示意图为:


  7)leftOutJoin

        将LEFT左边的表名1中的所有记录全部保留,而将右边的表名2中的字段B与表名1.字段A相对应的记录显示出来

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.leftOuterJoin(d).collect

          res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

          该实例的操作示意图为:


  8)rightOutJoin(右外连接)

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.rightOuterJoin(d).collect

          res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

          该实例的操作示意图为:


三、Action算子

  1)foreach

           val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)//导入数据使用parallelize方式

           c.foreach(x => println(x + "s are yummy"))//得到一条数据就处理一条数据

           该实例的操作示意图为:


 2)fold

            val a = sc.parallelize(List(1,2,3), 3)

            a.fold(0)(_ + _)

            res59: Int = 6

           针对这个实例画出示意图:

 3)aggregate

            val z = sc.parallelize(List(1,2,3,4,5,6), 2)

            // lets first print out the contents of the RDD with partition labels

            def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

                iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator

           }

            z.mapPartitionsWithIndex(myfunc).collect

           res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val:3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

           z.aggregate(0)(math.max(_, _), _ + _)

           res40: Int = 9

           针对这个实例画出示意图:



  4)collect

          val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

          c.collect  //通过collect算子将两个partition结合成一个

          res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

          针对这个实例画出示意图:


  5)collectAsMap

            RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value

            val a = sc.parallelize(List(1, 2, 1, 3), 1)

            val b = a.zip(a)

            b.collectAsMap

            res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

           针对这个实例画出示意图:



  6)reduceByKeyLocally

            该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

            val a =sc.parallelize(List("dog","cat", "owl", "gnu", "ant"), 2)

            val b = a.map(x => (x.length,x))

            b.reduceByKey(_ + _).collect

            res86: Array[(Int, String)] =Array((3,dogcatowlgnuant))

           针对这个实例画出示意图:



  7)lookup

           val a =sc.parallelize(List("dog","tiger", "lion", "cat", "panther", "eagle"), 2)

           val b = a.map(x => (x.length, x))

           b.lookup(5)

           res0: Seq[String] = WrappedArray(tiger, eagle)

           针对这个实例画出示意图:



  8)count

            val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

            c.count

            res2: Long = 4

           针对这个实例画出示意图:


  9)top 

            val c = sc.parallelize(Array(6, 9, 4,7, 5, 8), 2)

            c.top(2)

            res28: Array[Int] = Array(9, 8)

           针对这个实例画出示意图:


  10)reduce

            val a = sc.parallelize(1 to 100, 3)

            a.reduce(_ + _)

            res41: Int = 5050

           针对这个实例画出示意图:




这篇文章也是写了一个礼拜才出来的成果,本身不是很熟悉,需要一个算子一个算子去理解,有些地方也没有很准确,但也是我理解能力范围之内了,网站上不太能查到有关于这方面的知识,所以绝大部分靠自学,理解不对的地方也请多多包涵啦。对于mapreduce和spark的理解也仅限于参考网上的知识加上自己的一些见解。

-------------------------------------------参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html-------------------------------------------

------------------------------------------------------------参考:https://www.cnblogs.com/MOBIN/p/5414490.html#12-----------------------------------------------------------

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...
    哎哟喂喽阅读 6,602评论 0 51
  • zipWithIndex:首先基于分区索引 然后基于分区内元素索引 第一个元素是第一个分区的第一个元素 最后一个元...
    时待吾阅读 470评论 0 0
  • 1、RDD RDD(Resilient Distributed Dataset弹性分布式数据集)是Spark中抽象...
    青禾ws阅读 641评论 2 3
  • 背景 一年多以前我在知乎上答了有关LeetCode的问题, 分享了一些自己做题目的经验。 张土汪:刷leetcod...
    土汪阅读 12,719评论 0 33
  • 在2017年9月22日的时候我的显微之旅开始了。 在回到家的时候,刚下车我就如闪电一般冲向了家中,当看到显微镜的时...
    暗夜男爵阅读 474评论 0 0