Spark算子


layout: pospost
title: Spark算子
date: 2017-02-27 16:03:52
tags: [Spark算子,Spark,大数据]
categories: "大数据"


RDD创建操作

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
  从一个Seq集合创建 RDD。
参数1:Seq集合,必须。
参数2:分区数,默认为该Application分配到的资源的CPU核数( Spark will run one task for each partition of the cluster. )

scala> val data = Array(1,2,3,4,5,6);
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num = sc.parallelize(data);
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

scala> num.collect();
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num1 = sc.parallelize(1 to 10);
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> num1.collect();
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> num1.partitions.size
res3: Int = 4

scala> val num2 = sc.parallelize(1 to 10 ,2);
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> num2.partitions.size
res4: Int = 2

  • makeRDD (跳过)

与parallelize一模一样

从外部存储创建RDD

  • textFile

从hdfs文件创建

scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> textFile.collect();
res6: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> textFile.count();
res7: Long = 3

从其他HDFS文件格式创建 (跳过)

  • hadoopFile
  • sequenceFile
  • objectFile
  • newAPIHadoopFile

从Hadoop接口API创建 (跳过)

  • hadoopRDD
  • newAPIHadoopRDD

RDD基本转换操作(1) –map、flatMap、distinct

数据准备

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ bin/hadoop dfs -copyFromLocal ~/test/1.txt  /user/test/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ cat ~/test/1.txt 
Hello World
Hello Yang
Hello SCALA
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ 
  • map

将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。

//从HDFS从加载文件
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[13] at textFile at <console>:24
//打印 textFile 内容
scala> textFile.collect();
res7: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)               
// map 算子
scala> val mapdata = textFile.map(line => line.split("\\s+"));
mapdata: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:26
//打印data 内容
scala> mapdata.collect();
res8: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Yang), Array(Hello, SCALA))
  • flatMap

与map类似,但是最后会将结果合并到一个分区。

scala> val flatmapdata = textFile.flatMap(line => line.split("\\s+"));
flatmapdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26

scala> flatmapdata.collect();
res10: Array[String] = Array(Hello, World, Hello, Yang, Hello, SCALA)

  • distinct

对元素去重

scala> val flatdistincedata = textFile.flatMap(line => line.split("\\s+")).distinct();
flatdistincedata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at distinct at <console>:26

scala> flatdistincedata.collect();
res0: Array[String] = Array(Hello, SCALA, World, Yang)

RDD基本转换操作(2)–coalesce、repartition

  • coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
将RDD的分区数量重新调整为numPartitions个,如果shuffle为true,分区数量可以大于原值。

scala> var data = sc.textFile("/user/test/1.txt");
data: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[16] at textFile at <console>:24

scala> data.collect();
res9: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> data.partitions.size;
res11: Int = 2

scala> var newdata = data.coalesce(1);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[17] at coalesce at <console>:26

scala> newdata.collect();
res12: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> newdata.partitions.size;
res13: Int = 1

// shuttle 默认为true,新分区数量大于原分区值,无效
scala> var newdata = data.coalesce(3);  
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:26

scala> newdata.partitions.size;
res14: Int = 2

// shuttle 默认为false,新分区数量可以大于原分区值,有效
scala> var newdata = data.coalesce(3,true);
newdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at coalesce at <console>:26

scala> newdata.partitions.size;
res15: Int = 3

  • repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]
相当于 coalesce 的shuttle 为true

scala> var newdata1 = data.repartition(3);
newdata1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:26

scala> newdata1.partition
partitioner   partitions

scala> newdata1.partitions.size
res16: Int = 3

RDD基本转换操作(3)–union(并集)、intersection(交集)、subtract(差集)

  • union

def union(other: RDD[T]): RDD[T]
将两个RDD 合并

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(6 to 10,2);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd1.union(rdd2).collect();
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  • intersetion

def intersection(other: org.apache.spark.rdd.RDD[Int],numPartitions: Int): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int],partitioner: org.apache.spark.Partitioner)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int]): org.apache.spark.rdd.RDD[Int]
求两个RDD的交集,可以指定返回的RDD的分区数或者指定分区函数。不去重

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(3 to 7,1);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> rdd1.intersection(rdd2).collect();
res28: Array[Int] = Array(4, 3, 5)
scala> var rdd3 = rdd1.intersection(rdd2,4);
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[50] at intersection at <console>:28
scala> rdd3.partitions.size
res31: Int = 4
  • substract

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
返回在RDD中出现,并且不在otherRDD中出现的元素,不去重

scala> var rdd1 = sc.makeRDD(Seq(1,1,2,2,3,3));
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:24

scala> var rdd2 = sc.makeRDD(Seq(3,3,4,4,5,5));
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at makeRDD at <console>:24

scala> rdd1.subtract(rdd2).collect();
res32: Array[Int] = Array(1, 1, 2, 2)

RDD基本转换操作(4)-mapPartitions 、mapPartitionsWithIndex

  • mapPartitions

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T.
跟map类似,区别在于mapPartitions作用在RDD的每一个分区上,func需要实现从 Iterator< T > 到Iterator< U > 的转换。

scala> var rdd = sc.makeRDD(1 to 10,4);
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at makeRDD at <console>:24
scala> var rdd1 = rdd.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0;
     |     while (x.hasNext){
     |        i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[58] at mapPartitions at <console>:26

scala> rdd1.collect
res33: Array[Int] = Array(3, 12, 13, 27)

scala> rdd.partitions.size
res34: Int = 4

  • mapPartitionsWithIndex

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator< T >) => Iterator< U > when running on an RDD of type T.
跟mapPartitions 类似,区别在于 func需要实现(Int, Iterator< T >) 到 Iterator< U > 的类型转换,第一个整数代表分区的下标

scala> var rdd1 = rdd.mapPartitionsWithIndex{ (index,x) => {
     |     var result = List[String]()
     |     var i = 0
     |     while (x.hasNext){
     |        i+=x.next()
     |     }
     |     result.::(index+"|"+i).iterator
     |  }
     | }
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at mapPartitionsWithIndex at <console>:26

scala> rdd1.collect
res38: Array[String] = Array(0|3, 1|12, 2|13, 3|27)

RDD基本转换操作(5)

  • groupByKey

**def groupByKey(): RDD[(K, Iterable[V])] **
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] // 指定分区数
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

scala> var arr = Array(("1","YANG"),("1","YANG1"),("2","HELLO"),("2","HELLO2"),("3","WORLD"));
arr: Array[(String, String)] = Array((1,YANG), (1,YANG1), (2,HELLO), (2,HELLO2), (3,WORLD))

scala> var rdd = sc.makeRDD(arr);
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[61] at makeRDD at <console>:26

scala> var rdd1 = rdd.groupByKey();
rdd1: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:28

scala> rdd1.collect
res39: Array[(String, Iterable[String])] = Array((1,CompactBuffer(YANG, YANG1)), (2,CompactBuffer(HELLO, HELLO2)), (3,CompactBuffer(WORLD)))

  • reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",3),("B",3)));
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:24

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[64] at reduceByKey at <console>:26

scala> rdd1.collect
res40: Array[(String, Int)] = Array((A,6), (B,6))

scala> rdd.partitions.size
res41: Int = 4

scala> rdd1.partitions.size
res42: Int = 4

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y,10);
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:26

scala> rdd1.collect
res43: Array[(String, Int)] = Array((A,6), (B,6))                               

scala> rdd1.partitions.size
res44: Int = 10
  • reduceByKeyLocally

类似与reduceByKey ,但是返回的不是RDD,而是Map

scala> rdd.collect
res45: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (B,3), (B,3))

scala> rdd.reduceByKeyLocally((x,y) => x+y)
res46: scala.collection.Map[String,Int] = Map(B -> 6, A -> 6)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容