常用算子

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

RDD function calls

aggregate

aggregateByKey [Pair]

cartesian

checkpoint

coalesce, repartition

cogroup [pair], groupWith [Pair]

collect, toArray

collectAsMap [pair]

combineByKey [pair]

compute

context, sparkContext

count

countApprox

countApproxDistinct

countApproxDistinctByKey [pair]

countByKey [pair]

countByKeyApprox [pair]

countByValue

countByValueApprox

dependencies

distinct

first

filter

filterByRange [Ordered]

filterWith

flatMap

flatMapValues [Pair]

flatMapWith

fold

foldByKey [Pair]

foreach

foreachPartition

foreachWith

fullOuterJoin [Pair]

generator, setGenerator

getCheckpointFile

preferredLocations

getStorageLevel

glom

groupBy

groupByKey [Pair]

histogram [Double]

id

intersection

isCheckpointed

iterator

join [pair]

keyBy

keys [pair]

leftOuterJoin [pair]

lookup [pair]

map

mapPartitions

mapPartitionsWithContext

mapPartitionsWithIndex

mapPartitionsWithSplit

mapValues [pair]

mapWith

max

mean [Double], meanApprox [Double]

min

name, setName

partitionBy [Pair]

partitioner

partitions

persist, cache

pipe

randomSplit

reduce

reduceByKey [Pair], reduceByKeyLocally[Pair], reduceByKeyToDriver[Pair]

repartition

repartitionAndSortWithPartitions [Ordered]

rightOuterJoin [Pair] 

sample

sampleByKey [Pair]

sampleByKeyExact [Pair]

saveAsHodoopFile [Pair], saveAsHadoopDataset [Pair], saveAsNewAPIHadoopFile [Pair]

saveAsObjectFile

saveAsSequenceFile [SeqFile]

saveAsTextFile

stats [Double]

sortBy

sortByKey [Ordered]

stdev [Double], sampleStdev [Double]

subtract

subtractByKey [Pair]

sum [Double], sumApprox[Double]

take

takeOrdered

takeSample

treeAggregate

treeReduce

toDebugString

toJavaRDD

toLocalIterator

top

toString

union, ++

unpersist

values [Pair]

variance [Double], sampleVariance [Double]

zip

zipPartitions

zipWithIndex

zipWithUniquId

Our research group has a very strong focus on using and improving Apache Spark to solve real world programs. In order to do this we need to have a very solid understanding of the capabilities of Spark. So one of the first things we have done is to go through the entire Spark RDD API and write examples to test their functionality. This has been a very useful exercise and we would like to share the examples with everyone.

Authors of examples: Matthias Langer and Zhen He

Emails addresses: m.langer@latrobe.edu.au, z.he@latrobe.edu.au

These examples have only been tested for Spark version 1.4. We assume the functionality of Spark is stable and therefore the examples should be valid for later releases.

If you find any errors in the example we would love to hear about them so we can fix them up. So please email us to let us know.

The RDD API By Example

RDD is short for Resilient Distributed Dataset. RDDs are the workhorse of the Spark system. As a user, one can consider a RDD as a handle for a collection of individual data partitions, which are the result of some computation.

However, an RDD is actually more than that. On cluster installations, separate data partitions can be on separate nodes. Using the RDD as a handle one can access all partitions and perform computations and transformations using the contained data. Whenever a part of a RDD or an entire RDD is lost, the system is able to reconstruct the data of lost partitions by using lineage information. Lineage refers to the sequence of transformations used to produce the current RDD. As a result, Spark is able to recover automatically from most failures.

All RDDs available in Spark derive either directly or indirectly from the class RDD. This class comes with a large set of methods that perform operations on the data within the associated partitions. The class RDD is abstract. Whenever, one uses a RDD, one is actually using a concertized implementation of RDD. These implementations have to overwrite some core functions to make the RDD behave as expected.

One reason why Spark has lately become a very popular system for processing big data is that it does not impose restrictions regarding what data can be stored within RDD partitions. The RDD API already contains many useful operations. But, because the creators of Spark had to keep the core API of RDDs common enough to handle arbitrary data-types, many convenience functions are missing.

The basic RDD API considers each data item as a single value. However, users often want to work with key-value pairs. Therefore Spark extended the interface of RDD to provide additional functions (PairRDDFunctions), which explicitly work on key-value pairs. Currently, there are four extensions to the RDD API available in spark. They are as follows:

DoubleRDDFunctions 

This extension contains many useful methods for aggregating numeric values. They become available if the data items of an RDD are implicitly convertible to the Scala data-type double.

PairRDDFunctions 

Methods defined in this interface extension become available when the data items have a two component tuple structure. Spark will interpret the first tuple item (i.e. tuplename. 1) as the key and the second item (i.e. tuplename. 2) as the associated value.

OrderedRDDFunctions 

Methods defined in this interface extension become available if the data items are two-component tuples where the key is implicitly sortable.

SequenceFileRDDFunctions 

This extension contains several methods that allow users to create Hadoop sequence- les from RDDs. The data items must be two compo- nent key-value tuples as required by the PairRDDFunctions. However, there are additional requirements considering the convertibility of the tuple components to Writable types.

Since Spark will make methods with extended functionality automatically available to users when the data items fulfill the above described requirements, we decided to list all possible available functions in strictly alphabetical order. We will append either of the followingto the function-name to indicate it belongs to an extension that requires the data items to conform to a certain format or type.

[Double] - Double RDD Functions

[Ordered] - OrderedRDDFunctions

[Pair] - PairRDDFunctions

[SeqFile] - SequenceFileRDDFunctions

aggregate

The aggregate function allows the user to apply two different reduce functions to the RDD. The first reduce function is applied within each partition to reduce the data within each partition into a single result. The second reduce function is used to combine the different reduced results of all partitions together to arrive at one final result. The ability to have two separate reduce functions for intra partition versus across partition reducing adds a lot of flexibility. For example the first reduce function can be the max function and the second one can be the sum function. The user also specifies an initial value. Here are some important facts.

The initial value is applied at both levels of reduce. So both at the intra partition reduction and across partition reduction.

Both reduce functions have to be commutative and associative.

Do not assume any execution order for either partition computations or combining partitions.

Why would one want to use two input data types? Let us assume we do an archaeological site survey using a metal detector. While walking through the site we take GPS coordinates of important findings based on the output of the metal detector. Later, we intend to draw an image of a map that highlights these locations using the aggregate function. In this case the zeroValue could be an area map with no highlights. The possibly huge set of input data is stored as GPS coordinates across many partitions. seqOp (first reducer) could convert the GPS coordinates to map coordinates and put a marker on the map at the respective position. combOp (second reducer) will receive these highlights as partial maps and combine them into a single final output map.

Listing Variants

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

Examples 1

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

// lets first print out the contents of the RDD with partition labels

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

z.mapPartitionsWithIndex(myfunc).collect

res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

z.aggregate(0)(math.max(_, _), _ + _)

res40: Int = 9

// This example returns 16 since the initial value is 5

// reduce of partition 0 will be max(5, 1, 2, 3) = 5

// reduce of partition 1 will be max(5, 4, 5, 6) = 6

// final reduce across partitions will be 5 + 5 + 6 = 16

// note the final reduce include the initial value

z.aggregate(5)(math.max(_, _), _ + _)

res29: Int = 16

val z = sc.parallelize(List("a","b","c","d","e","f"),2)

//lets first print out the contents of the RDD with partition labels

def myfunc(index: Int, iter: Iterator[(String)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

z.mapPartitionsWithIndex(myfunc).collect

res31: Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])

z.aggregate("")(_ + _, _+_)

res115: String = abcdef

// See here how the initial value "x" is applied three times.

//  - once for each partition

//  - once when combining all the partitions in the second reduce function.

z.aggregate("x")(_ + _, _+_)

res116: String = xxdefxabc

// Below are some more advanced examples. Some are quite tricky to work out.

val z = sc.parallelize(List("12","23","345","4567"),2)

z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

res141: String = 42

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

res142: String = 11

val z = sc.parallelize(List("12","23","345",""),2)

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

res143: String = 10

The main issue with the code above is that the result of the inner min is a string of length 1. 

The zero in the output is due to the empty string being the last string in the list. We see this result because we are not recursively reducing any further within the partition for the final string.

Examples 2

val z = sc.parallelize(List("12","23","","345"),2)

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

res144: String = 11

In contrast to the previous example, this example has the empty string at the beginning of the second partition. This results in length of zero being input to the second reduce which then upgrades it a length of 1. (Warning: The above example shows bad design since the output is dependent on the order of the data inside the partitions.)

aggregateByKey [Pair]

Works like the aggregate function except the aggregation is applied to the values with the same key. Also unlike the aggregate function the initial value is not applied to the second reduce.

Listing Variants

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Example

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

// lets have a look at what is in the partitions

def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

pairRDD.mapPartitionsWithIndex(myfunc).collect

res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])

pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect

res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

cartesian

Computes the cartesian product between two RDDs (i.e. Each item of the first RDD is joined with each item of the second RDD) and returns them as a new RDD. (Warning: Be careful when using this function.! Memory consumption can quickly become an issue!)

Listing Variants

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

Example

val x = sc.parallelize(List(1,2,3,4,5))

val y = sc.parallelize(List(6,7,8,9,10))

x.cartesian(y).collect

res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

checkpoint

Will create a checkpoint when the RDD is computed next. Checkpointed RDDs are stored as a binary file within the checkpoint directory which can be specified using the Spark context. (Warning: Spark applies lazy evaluation. Checkpointing will not occur until an action is invoked.)

Important note: the directory  "my_directory_name" should exist in all slaves. As an alternative you could use an HDFS directory URL as well.

Listing Variants

def checkpoint()

Example

sc.setCheckpointDir("my_directory_name")

val a = sc.parallelize(1 to 4)

a.checkpoint

a.count

14/02/25 18:13:53 INFO SparkContext: Starting job: count at :15

...

14/02/25 18:13:53 INFO MemoryStore: Block broadcast_5 stored as values to memory (estimated size 115.7 KB, free 296.3 MB)

14/02/25 18:13:53 INFO RDDCheckpointData: Done checkpointing RDD 11 to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/my_directory_name/65407913-fdc6-4ec1-82c9-48a1656b95d6/rdd-11, new parent is RDD 12

res23: Long = 4

coalesce, repartition

Coalesces the associated data into a given number of partitions. repartition(numPartitions) is simply an abbreviation for coalesce(numPartitions, shuffle = true).

Listing Variants

def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]

def repartition ( numPartitions : Int ): RDD [T]

Example

val y = sc.parallelize(1 to 10, 10)

val z = y.coalesce(2, false)

z.partitions.length

res9: Int = 2

cogroup [Pair], groupWith [Pair]

A very powerful set of functions that allow grouping up to 3 key-value RDDs together using their keys.

Listing Variants

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]

Examples

val a = sc.parallelize(List(1, 2, 1, 3), 1)

val b = a.map((_, "b"))

val c = a.map((_, "c"))

b.cogroup(c).collect

res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(

(2,(ArrayBuffer(b),ArrayBuffer(c))),

(3,(ArrayBuffer(b),ArrayBuffer(c))),

(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))

)

val d = a.map((_, "d"))

b.cogroup(c, d).collect

res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(

(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),

(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),

(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))

)

val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)

val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)

x.cogroup(y).collect

res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(

(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), 

(2,(ArrayBuffer(banana),ArrayBuffer())), 

(3,(ArrayBuffer(orange),ArrayBuffer())),

(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),

(5,(ArrayBuffer(),ArrayBuffer(computer))))

collect, toArray

Converts the RDD into a Scala array and returns it. If you provide a standard map-function (i.e. f = T -> U) it will be applied before inserting the values into the result array.

Listing Variants

def collect(): Array[T]

def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

def toArray(): Array[T]

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

c.collect

res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

collectAsMap [Pair] 

Similar to collect, but works on key-value RDDs and converts them into Scala maps to preserve their key-value structure.

Listing Variants

def collectAsMap(): Map[K, V]

Example

val a = sc.parallelize(List(1, 2, 1, 3), 1)

val b = a.zip(a)

b.collectAsMap

res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

combineByKey[Pair] 

Very efficient implementation that combines the values of a RDD consisting of two-component tuples by applying multiple aggregators one after another.

Listing Variants

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]

Example

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

val c = b.zip(a)

val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)

d.collect

res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

compute

Executes dependencies and computes the actual representation of the RDD. This function should not be called directly by users.

Listing Variants

def compute(split: Partition, context: TaskContext): Iterator[T]

context, sparkContext

Returns the SparkContext that was used to create the RDD.

Listing Variants

def compute(split: Partition, context: TaskContext): Iterator[T]

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.context

res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1

count

Returns the number of items stored within a RDD.

Listing Variants

def count(): Long

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.count

res2: Long = 4

countApprox

Marked as experimental feature! Experimental features are currently not covered by this document!

Listing Variants

def (timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

countApproxDistinct

Computes the approximate number of distinct values. For large RDDs which are spread across many nodes, this function may execute faster than other counting methods. The parameter relativeSD controls the accuracy of the computation.

Listing Variants

def countApproxDistinct(relativeSD: Double = 0.05): Long

Example

val a = sc.parallelize(1 to 10000, 20)

val b = a++a++a++a++a

b.countApproxDistinct(0.1)

res14: Long = 8224

b.countApproxDistinct(0.05)

res15: Long = 9750

b.countApproxDistinct(0.01)

res16: Long = 9947

b.countApproxDistinct(0.001)

res0: Long = 10000

countApproxDistinctByKey [Pair]


Similar to countApproxDistinct, but computes the approximate number of distinct values for each distinct key. Hence, the RDD must consist of two-component tuples. For large RDDs which are spread across many nodes, this function may execute faster than other counting methods. The parameter relativeSD controls the accuracy of the computation.

Listing Variants

def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)]

def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)]

def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)]

Example

val a = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

val b = sc.parallelize(a.takeSample(true, 10000, 0), 20)

val c = sc.parallelize(1 to b.count().toInt, 20)

val d = b.zip(c)

d.countApproxDistinctByKey(0.1).collect

res15: Array[(String, Long)] = Array((Rat,2567), (Cat,3357), (Dog,2414), (Gnu,2494))

d.countApproxDistinctByKey(0.01).collect

res16: Array[(String, Long)] = Array((Rat,2555), (Cat,2455), (Dog,2425), (Gnu,2513))

d.countApproxDistinctByKey(0.001).collect

res0: Array[(String, Long)] = Array((Rat,2562), (Cat,2464), (Dog,2451), (Gnu,2521))

countByKey [Pair]

Very similar to count, but counts the values of a RDD consisting of two-component tuples for each distinct key separately.

Listing Variants

def countByKey(): Map[K, Long]

Example

val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)

c.countByKey

res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

countByKeyApprox [Pair]

Marked as experimental feature! Experimental features are currently not covered by this document!

Listing Variants

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]

countByValue

Returns a map that contains all unique values of the RDD and their respective occurrence counts. (Warning: This operation will finally aggregate the information in a single reducer.)

Listing Variants

def countByValue(): Map[T, Long]

Example

val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))

b.countByValue

res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)

countByValueApprox

Marked as experimental feature! Experimental features are currently not covered by this document!

Listing Variants

def countByValueApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[T, BoundedDouble]]

dependencies


Returns the RDD on which this RDD depends.

Listing Variants

final def dependencies: Seq[Dependency[_]]

Example

val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))

b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at :12

b.dependencies.length

Int = 0

b.map(a => a).dependencies.length

res40: Int = 1

b.cartesian(a).dependencies.length

res41: Int = 2

b.cartesian(a).dependencies

res42: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.rdd.CartesianRDD$$anon$1@576ddaaa, org.apache.spark.rdd.CartesianRDD$$anon$2@6d2efbbd)

distinct


Returns a new RDD that contains each unique value only once.

Listing Variants

def distinct(): RDD[T]

def distinct(numPartitions: Int): RDD[T]

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

c.distinct.collect

res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

a.distinct(2).partitions.length

res16: Int = 2

a.distinct(3).partitions.length

res17: Int = 3

first


Looks for the very first data item of the RDD and returns it.

Listing Variants

def first(): T

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.first

res1: String = Gnu

filter


Evaluates a boolean function for each data item of the RDD and puts the items for which the function returned true into the resulting RDD.

Listing Variants

def filter(f: T => Boolean): RDD[T]

Example

val a = sc.parallelize(1 to 10, 3)

val b = a.filter(_ % 2 == 0)

b.collect

res3: Array[Int] = Array(2, 4, 6, 8, 10)

When you provide a filter function, it must be able to handle all data items contained in the RDD. Scala provides so-called partial functions to deal with mixed data-types. (Tip: Partial functions are very useful if you have some data which may be bad and you do not want to handle but for the good data (matching data) you want to apply some kind of map function. The following article is good. It teaches you about partial functions in a very nice way and explains why case has to be used for partial functions:article)

Examples for mixed data without partial functions

val b = sc.parallelize(1 to 8)

b.filter(_ < 4).collect

res15: Array[Int] = Array(1, 2, 3)

val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog"))

a.filter(_ < 4).collect

:15: error: value < is not a member of Any

This fails because some components of a are not implicitly comparable against integers. Collect uses the isDefinedAt property of a function-object to determine whether the test-function is compatible with each data item. Only data items that pass this test (=filter) are then mapped using the function-object.

Examples for mixed data with partial functions

val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog"))

a.collect({case a: Int    => "is integer" |

case b: String => "is string" }).collect

res17: Array[String] = Array(is string, is string, is integer, is string)

val myfunc: PartialFunction[Any, Any] = {

case a: Int    => "is integer" |

case b: String => "is string" }

myfunc.isDefinedAt("")

res21: Boolean = true

myfunc.isDefinedAt(1)

res22: Boolean = true

myfunc.isDefinedAt(1.5)

res23: Boolean = false

Be careful! The above code works because it only checks the type itself! If you use operations on this type, you have to explicitly declare what type you want instead of any. Otherwise the compiler does (apparently) not know what bytecode it should produce:

val myfunc2: PartialFunction[Any, Any] = {case x if (x < 4) => "x"}

:10: error: value < is not a member of Any

val myfunc2: PartialFunction[Int, Any] = {case x if (x < 4) => "x"}

myfunc2: PartialFunction[Int,Any] =

filterByRange [Ordered]


Returns an RDD containing only the items in the key range specified. From our testing, it appears this only works if your data is in key value pairs and it has already been sorted by key.

Listing Variants

def filterByRange(lower: K, upper: K): RDD[P]

Example

val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)

val sortedRDD = randRDD.sortByKey()

sortedRDD.filterByRange(1, 3).collect

res66: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))

filterWith (deprecated)


This is an extended version of filter. It takes two function arguments. The first argument must conform to Int -> T and is executed once per partition. It will transform the partition index to type T. The second function looks like (U, T) -> Boolean. T is the transformed partition index and U are the data items from the RDD. Finally the function has to return either true or false (i.e. Apply the filter).

Listing Variants

def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T]

Example

val a = sc.parallelize(1 to 9, 3)

val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)

b.collect

res37: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)

a.filterWith(x=> x)((a, b) =>  b == 0).collect

res30: Array[Int] = Array(1, 2)

a.filterWith(x=> x)((a, b) =>  a % (b+1) == 0).collect

res33: Array[Int] = Array(1, 2, 4, 6, 8, 10)

a.filterWith(x=> x.toString)((a, b) =>  b == "2").collect

res34: Array[Int] = Array(5, 6)

flatMap


Similar to map, but allows emitting more than one item in the map function.

Listing Variants

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

Example

val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect

res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect

res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

// The program below generates a random number of copies (up to 10) of the items in the list.

val x  = sc.parallelize(1 to 10, 3)

x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

flatMapValues


Very similar to mapValues, but collapses the inherent structure of the values during mapping.

Listing Variants

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.flatMapValues("x" + _ + "x").collect

res6: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x))

flatMapWith (deprecated)


Similar to flatMap, but allows accessing the partition index or a derivative of the partition index from within the flatMap-function.

Listing Variants

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

Example

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)

a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect

res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

fold


Aggregates the values of each partition. The aggregation variable within each partition is initialized with zeroValue.

Listing Variants

def fold(zeroValue: T)(op: (T, T) => T): T

Example

val a = sc.parallelize(List(1,2,3), 3)

a.fold(0)(_ + _)

res59: Int = 6

foldByKey [Pair]


Very similar to fold, but performs the folding separately for each key of the RDD. This function is only available if the RDD consists of two-component tuples.

Listing Variants

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

Example

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

val b = a.map(x => (x.length, x))

b.foldByKey("")(_ + _).collect

res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.foldByKey("")(_ + _).collect

res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

foreach


Executes an parameterless function for each data item.

Listing Variants

def foreach(f: T => Unit)

Example

val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)

c.foreach(x => println(x + "s are yummy"))

lions are yummy

gnus are yummy

crocodiles are yummy

ants are yummy

whales are yummy

dolphins are yummy

spiders are yummy

foreachPartition


Executes an parameterless function for each partition. Access to the data items contained in the partition is provided via the iterator argument.

Listing Variants

def foreachPartition(f: Iterator[T] => Unit)

Example

val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

b.foreachPartition(x => println(x.reduce(_ + _)))

6

15

24

foreachWith (Deprecated)


Executes an parameterless function for each partition. Access to the data items contained in the partition is provided via the iterator argument.

Listing Variants

def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit)

Example

val a = sc.parallelize(1 to 9, 3)

a.foreachWith(i => i)((x,i) => if (x % 2 == 1 && i % 2 == 0) println(x) )

1

3

7

9

fullOuterJoin [Pair]


Performs the full outer join between two paired RDDs.

Listing Variants

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

Example

val pairRDD1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("book", 4),("cat", 12)))

val pairRDD2 = sc.parallelize(List( ("cat",2), ("cup", 5), ("mouse", 4),("cat", 12)))

pairRDD1.fullOuterJoin(pairRDD2).collect

res5: Array[(String, (Option[Int], Option[Int]))] = Array((book,(Some(4),None)), (mouse,(None,Some(4))), (cup,(None,Some(5))), (cat,(Some(2),Some(2))), (cat,(Some(2),Some(12))), (cat,(Some(5),Some(2))), (cat,(Some(5),Some(12))), (cat,(Some(12),Some(2))), (cat,(Some(12),Some(12))))

generator, setGenerator


Allows setting a string that is attached to the end of the RDD's name when printing the dependency graph.

Listing Variants

@transient var generator

def setGenerator(_generator: String)

getCheckpointFile


Returns the path to the checkpoint file or null if RDD has not yet been checkpointed.

Listing Variants

def getCheckpointFile: Option[String]

Example

sc.setCheckpointDir("/home/cloudera/Documents")

val a = sc.parallelize(1 to 500, 5)

val b = a++a++a++a++a

b.getCheckpointFile

res49: Option[String] = None

b.checkpoint

b.getCheckpointFile

res54: Option[String] = None

b.collect

b.getCheckpointFile

res57: Option[String] = Some(file:/home/cloudera/Documents/cb978ffb-a346-4820-b3ba-d56580787b20/rdd-40)

preferredLocations


Returns the hosts which are preferred by this RDD. The actual preference of a specific host depends on various assumptions.

Listing Variants

final def preferredLocations(split: Partition): Seq[String]

getStorageLevel


Retrieves the currently set storage level of the RDD. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. The example below shows the error you will get, when you try to reassign the storage level.

Listing Variants

def getStorageLevel

Example

val a = sc.parallelize(1 to 100000, 2)

a.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

a.getStorageLevel.description

String = Disk Serialized 1x Replicated

a.cache

java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level

glom


Assembles an array that contains all elements of the partition and embeds it in an RDD. Each returned array contains the contents of one partition.

Listing Variants

def glom(): RDD[Array[T]]

Example

val a = sc.parallelize(1 to 100, 3)

a.glom.collect

res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))

groupBy


Listing Variants

def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]

def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]

def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]

Example

val a = sc.parallelize(1 to 9, 3)

a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect

res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)

def myfunc(a: Int) : Int =

{

a % 2

}

a.groupBy(myfunc).collect

res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)

def myfunc(a: Int) : Int =

{

a % 2

}

a.groupBy(x => myfunc(x), 3).collect

a.groupBy(myfunc(_), 1).collect

res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

import org.apache.spark.Partitioner

class MyPartitioner extends Partitioner {

def numPartitions: Int = 2

def getPartition(key: Any): Int =

{

key match

{

case null     => 0

case key: Int => key          % numPartitions

case _        => key.hashCode % numPartitions

}

}

override def equals(other: Any): Boolean =

{

other match

{

case h: MyPartitioner => true

case _                => false

}

}

}

val a = sc.parallelize(1 to 9, 3)

val p = new MyPartitioner()

val b = a.groupBy((x:Int) => { x }, p)

val c = b.mapWith(i => i)((a, b) => (b, a))

c.collect

res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5))))

groupByKey [Pair]


Very similar to groupBy, but instead of supplying a function, the key-component of each pair will automatically be presented to the partitioner.

Listing Variants

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)

val b = a.keyBy(_.length)

b.groupByKey.collect

res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

histogram [Double]


These functions take an RDD of doubles and create a histogram with either even spacing (the number of buckets equals to bucketCount) or arbitrary spacing based on  custom bucket boundaries supplied by the user via an array of double values. The result type of both variants is slightly different, the first function will return a tuple consisting of two arrays. The first array contains the computed bucket boundary values and the second array contains the corresponding count of values (i.e. the histogram). The second variant of the function will just return the histogram as an array of integers.

Listing Variants

def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]]

def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long]

Example with even spacing

val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3)

a.histogram(5)

res11: (Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4))

val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)

a.histogram(6)

res18: (Array[Double], Array[Long]) = (Array(1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0),Array(6, 0, 1, 1, 3, 4))

Example with custom spacing

val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3)

a.histogram(Array(0.0, 3.0, 8.0))

res14: Array[Long] = Array(5, 3)

val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)

a.histogram(Array(0.0, 5.0, 10.0))

res1: Array[Long] = Array(6, 9)

a.histogram(Array(0.0, 5.0, 10.0, 15.0))

res1: Array[Long] = Array(6, 8, 1)

id

Retrieves the ID which has been assigned to the RDD by its device context.

Listing Variants

val id: Int

Example

val y = sc.parallelize(1 to 10, 10)

y.id

res16: Int = 19

intersection

Returns the elements in the two RDDs which are the same.

Listing Variants

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

def intersection(other: RDD[T]): RDD[T]

Example

val x = sc.parallelize(1 to 20)

val y = sc.parallelize(10 to 30)

val z = x.intersection(y)

z.collect

res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)

isCheckpointed

Indicates whether the RDD has been checkpointed. The flag will only raise once the checkpoint has really been created.

Listing Variants

def isCheckpointed: Boolean

Example

sc.setCheckpointDir("/home/cloudera/Documents")

c.isCheckpointed

res6: Boolean = false

c.checkpoint

c.isCheckpointed

res8: Boolean = false

c.collect

c.isCheckpointed

res9: Boolean = true

iterator

Returns a compatible iterator object for a partition of this RDD. This function should never be called directly.

Listing Variants

final def iterator(split: Partition, context: TaskContext): Iterator[T]

join [Pair]

Performs an inner join using two key-value RDDs. Please note that the keys must be generally comparable to make this work.

Listing Variants

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

Example

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.keyBy(_.length)

val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val d = c.keyBy(_.length)

b.join(d).collect

res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

keyBy

Constructs two-component tuples (key-value pairs) by applying a function on each data item. The result of the function becomes the key and the original data item becomes the value of the newly created tuples.

Listing Variants

def keyBy[K](f: T => K): RDD[(K, T)]

Example

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.keyBy(_.length)

b.collect

res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

keys [Pair]

Extracts the keys from all contained tuples and returns them in a new RDD.

Listing Variants

def keys: RDD[K]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.keys.collect

res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)

leftOuterJoin [Pair]

Performs an left outer join using two key-value RDDs. Please note that the keys must be generally comparable to make this work correctly.

Listing Variants

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

Example

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.keyBy(_.length)

val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val d = c.keyBy(_.length)

b.leftOuterJoin(d).collect

res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

lookup

Scans the RDD for all keys that match the provided value and returns their values as a Scala sequence.

Listing Variants

def lookup(key: K): Seq[V]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.lookup(5)

res0: Seq[String] = WrappedArray(tiger, eagle)

map

Applies a transformation function on each item of the RDD and returns the result as a new RDD.

Listing Variants

def map[U: ClassTag](f: T => U): RDD[U]

Example

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.map(_.length)

val c = a.zip(b)

c.collect

res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

mapPartitions

This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.

Listing Variants

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Example 1

val a = sc.parallelize(1 to 9, 3)

def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {

var res = List[(T, T)]()

var pre = iter.next

while (iter.hasNext)

{

val cur = iter.next;

res .::= (pre, cur)

pre = cur;

}

res.iterator

}

a.mapPartitions(myfunc).collect

res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

Example 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {

var res = List[Int]()

while (iter.hasNext) {

val cur = iter.next;

res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)

}

res.iterator

}

x.mapPartitions(myfunc).collect

// some of the number are not outputted at all. This is because the random number generated for it is zero.

res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)

The above program can also be written using flatMap as follows.

Example 2 using flatmap

val x  = sc.parallelize(1 to 10, 3)

x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

mapPartitionsWithContext (deprecated and developer API)

Similar to mapPartitions, but allows accessing information about the processing state within the mapper.

Listing Variants

def mapPartitionsWithContext[U: ClassTag](f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Example

val a = sc.parallelize(1 to 9, 3)

import org.apache.spark.TaskContext

def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = {

tc.addOnCompleteCallback(() => println(

"Partition: "     + tc.partitionId +

", AttemptID: "   + tc.attemptId ))


iter.toList.filter(_ % 2 == 0).iterator

}

a.mapPartitionsWithContext(myfunc).collect

14/04/01 23:05:48 INFO SparkContext: Starting job: collect at :20

...

14/04/01 23:05:48 INFO Executor: Running task ID 0

Partition: 0, AttemptID: 0, Interrupted: false

...

14/04/01 23:05:48 INFO Executor: Running task ID 1

14/04/01 23:05:48 INFO TaskSetManager: Finished TID 0 in 470 ms on localhost (progress: 0/3)

...

14/04/01 23:05:48 INFO Executor: Running task ID 2

14/04/01 23:05:48 INFO TaskSetManager: Finished TID 1 in 23 ms on localhost (progress: 1/3)

14/04/01 23:05:48 INFO DAGScheduler: Completed ResultTask(0, 1)

?

res0: Array[Int] = Array(2, 6, 4, 8)

mapPartitionsWithIndex

Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.

Listing Variants

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Example

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {

iter.map(x => index + "," + x)

}

x.mapPartitionsWithIndex(myfunc).collect()

res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)

mapPartitionsWithSplit

This method has been marked as deprecated in the API. So, you should not use this method anymore. Deprecated methods will not be covered in this document.

Listing Variants

def mapPartitionsWithSplit[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

mapValues [Pair]

Takes the values of a RDD that consists of two-component tuples, and applies the provided function to transform each value. Then, it forms new two-component tuples using the key and the transformed value and stores them in a new RDD.

Listing Variants

def mapValues[U](f: V => U): RDD[(K, U)]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.mapValues("x" + _ + "x").collect

res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith (deprecated)

This is an extended version of map. It takes two function arguments. The first argument must conform to Int -> T and is executed once per partition. It will map the partition index to some transformed partition index of type T. This is where it is nice to do some kind of initialization code once per partition. Like create a Random number generator object. The second function must conform to (U, T) -> U. T is the transformed partition index and U is a data item of the RDD. Finally the function has to return a transformed data item of type U.

Listing Variants

def mapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

Example

// generates 9 random numbers less than 1000. 

val x = sc.parallelize(1 to 9, 3)

x.mapWith(a => new scala.util.Random)((x, r) => r.nextInt(1000)).collect

res0: Array[Int] = Array(940, 51, 779, 742, 757, 982, 35, 800, 15)

val a = sc.parallelize(1 to 9, 3)

val b = a.mapWith("Index:" + _)((a, b) => ("Value:" + a, b))

b.collect

res0: Array[(String, String)] = Array((Value:1,Index:0), (Value:2,Index:0), (Value:3,Index:0), (Value:4,Index:1), (Value:5,Index:1), (Value:6,Index:1), (Value:7,Index:2), (Value:8,Index:2), (Value:9,Index:2)

max

Returns the largest element in the RDD

Listing Variants

def max()(implicit ord: Ordering[T]): T

Example

val y = sc.parallelize(10 to 30)

y.max

res75: Int = 30

val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))

a.max

res6: (Int, String) = (18,cat)

mean [Double], meanApprox [Double]

Calls stats and extracts the mean component. The approximate version of the function can finish somewhat faster in some scenarios. However, it trades accuracy for speed.

Listing Variants

def mean(): Double

def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

Example

val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)

a.mean

res0: Double = 5.3

min

Returns the smallest element in the RDD

Listing Variants

def min()(implicit ord: Ordering[T]): T

Example

val y = sc.parallelize(10 to 30)

y.min

res75: Int = 10

val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (8, "cat")))

a.min

res4: (Int, String) = (3,tiger)

name, setName

Allows a RDD to be tagged with a custom name.

Listing Variants

@transient var name: String

def setName(_name: String)

Example

val y = sc.parallelize(1 to 10, 10)

y.name

res13: String = null

y.setName("Fancy RDD Name")

y.name

res15: String = Fancy RDD Name

partitionBy [Pair]

Repartitions as key-value RDD using its keys. The partitioner implementation can be supplied as the first argument.

Listing Variants

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

partitioner 

Specifies a function pointer to the default partitioner that will be used for groupBy, subtract, reduceByKey (from PairedRDDFunctions), etc. functions.

Listing Variants

@transient val partitioner: Option[Partitioner]

partitions 

Returns an array of the partition objects associated with this RDD.

Listing Variants

final def partitions: Array[Partition]

Example

val b = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

b.partitions

res48: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@18aa, org.apache.spark.rdd.ParallelCollectionPartition@18ab)

persist, cache 

These functions can be used to adjust the storage level of a RDD. When freeing up memory, Spark will use the storage level identifier to decide which partitions should be kept. The parameterless variants persist() and cache() are just abbreviations for persist(StorageLevel.MEMORY_ONLY). (Warning: Once the storage level has been changed, it cannot be changed again!)

Listing Variants

def cache(): RDD[T]

def persist(): RDD[T]

def persist(newLevel: StorageLevel): RDD[T]

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

c.getStorageLevel

res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)

c.cache

c.getStorageLevel

res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

pipe 

Takes the RDD data of each partition and sends it via stdin to a shell-command. The resulting output of the command is captured and returned as a RDD of string values.

Listing Variants

def pipe(command: String): RDD[String]

def pipe(command: String, env: Map[String, String]): RDD[String]

def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String]

Example

val a = sc.parallelize(1 to 9, 3)

a.pipe("head -n 1").collect

res2: Array[String] = Array(1, 4, 7)

randomSplit 

Randomly splits an RDD into multiple smaller RDDs according to a weights Array which specifies the percentage of the total data elements that is assigned to each smaller RDD. Note the actual size of each smaller RDD is only approximately equal to the percentages specified by the weights Array. The second example below shows the number of items in each smaller RDD does not exactly match the weights Array.   A random optional seed can be specified. This function is useful for spliting data into a training set and a testing set for machine learning.

Listing Variants

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

Example

val y = sc.parallelize(1 to 10)

val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0)

val test = splits(1)

training.collect

res:85 Array[Int] = Array(1, 4, 5, 6, 8, 10)

test.collect

res86: Array[Int] = Array(2, 3, 7, 9)

val y = sc.parallelize(1 to 10)

val splits = y.randomSplit(Array(0.1, 0.3, 0.6))

val rdd1 = splits(0)

val rdd2 = splits(1)

val rdd3 = splits(2)

rdd1.collect

res87: Array[Int] = Array(4, 10)

rdd2.collect

res88: Array[Int] = Array(1, 3, 5, 8)

rdd3.collect

res91: Array[Int] = Array(2, 6, 7, 9)

reduce 

This function provides the well-known reduce functionality in Spark. Please note that any function f you provide, should be commutative in order to generate reproducible results.

Listing Variants

def reduce(f: (T, T) => T): T

Example

val a = sc.parallelize(1 to 100, 3)

a.reduce(_ + _)

res41: Int = 5050

reduceByKey [Pair],  reduceByKeyLocally [Pair], reduceByKeyToDriver [Pair]

This function provides the well-known reduce functionality in Spark. Please note that any function f you provide, should be commutative in order to generate reproducible results.

Listing Variants

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

def reduceByKeyToDriver(func: (V, V) => V): Map[K, V]

Example

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

val b = a.map(x => (x.length, x))

b.reduceByKey(_ + _).collect

res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.reduceByKey(_ + _).collect

res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

repartition

This function changes the number of partitions to the number specified by the numPartitions parameter 

Listing Variants

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

Example

val rdd = sc.parallelize(List(1, 2, 10, 4, 5, 2, 1, 1, 1), 3)

rdd.partitions.length

res2: Int = 3

val rdd2  = rdd.repartition(5)

rdd2.partitions.length

res6: Int = 5

repartitionAndSortWithinPartitions [Ordered]

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

Listing Variants

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

Example

// first we will do range partitioning which is not sorted

val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)

val rPartitioner = new org.apache.spark.RangePartitioner(3, randRDD)

val partitioned = randRDD.partitionBy(rPartitioner)

def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

partitioned.mapPartitionsWithIndex(myfunc).collect

res0: Array[String] = Array([partID:0, val: (2,cat)], [partID:0, val: (3,book)], [partID:0, val: (1,screen)], [partID:1, val: (4,tv)], [partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

// now lets repartition but this time have it sorted

val partitioned = randRDD.repartitionAndSortWithinPartitions(rPartitioner)

def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

partitioned.mapPartitionsWithIndex(myfunc).collect

res1: Array[String] = Array([partID:0, val: (1,screen)], [partID:0, val: (2,cat)], [partID:0, val: (3,book)], [partID:1, val: (4,tv)], [partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

rightOuterJoin [Pair]

Performs an right outer join using two key-value RDDs. Please note that the keys must be generally comparable to make this work correctly.

Listing Variants

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

Example

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.keyBy(_.length)

val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val d = c.keyBy(_.length)

b.rightOuterJoin(d).collect

res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

sample

Randomly selects a fraction of the items of a RDD and returns them in a new RDD.

Listing Variants

def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]

Example

val a = sc.parallelize(1 to 10000, 3)

a.sample(false, 0.1, 0).count

res24: Long = 960

a.sample(true, 0.3, 0).count

res25: Long = 2888

a.sample(true, 0.3, 13).count

res26: Long = 2985

sampleByKey [Pair]

Randomly samples the key value pair RDD according to the fraction of each key you want to appear in the final RDD.

Listing Variants

def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

Example

val randRDD = sc.parallelize(List( (7,"cat"), (6, "mouse"),(7, "cup"), (6, "book"), (7, "tv"), (6, "screen"), (7, "heater")))

val sampleMap = List((7, 0.4), (6, 0.6)).toMap

randRDD.sampleByKey(false, sampleMap,42).collect

res6: Array[(Int, String)] = Array((7,cat), (6,mouse), (6,book), (6,screen), (7,heater))

sampleByKeyExact [Pair, experimental]

This is labelled as experimental and so we do not document it.

Listing Variants

def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

saveAsHadoopFile [Pair], saveAsHadoopDataset [Pair], saveAsNewAPIHadoopFile [Pair]

Saves the RDD in a Hadoop compatible format using any Hadoop outputFormat class the user specifies.

Listing Variants

def saveAsHadoopDataset(conf: JobConf)

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F])

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F])

def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec])

def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None)

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F])

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration)

saveAsObjectFile

Saves the RDD in binary format.

Listing Variants

def saveAsObjectFile(path: String)

Example

val x = sc.parallelize(1 to 100, 3)

x.saveAsObjectFile("objFile")

val y = sc.objectFile[Int]("objFile")

y.collect

res52: Array[Int] =  Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

saveAsSequenceFile [SeqFile]

Saves the RDD as a Hadoop sequence file.

Listing Variants

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None)

Example

val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)

v.saveAsSequenceFile("hd_seq_file")

14/04/19 05:45:43 INFO FileOutputCommitter: Saved output of task 'attempt_201404190545_0000_m_000001_191' to file:/home/cloudera/hd_seq_file

[cloudera@localhost ~]$ ll ~/hd_seq_file

total 8

-rwxr-xr-x 1 cloudera cloudera 117 Apr 19 05:45 part-00000

-rwxr-xr-x 1 cloudera cloudera 133 Apr 19 05:45 part-00001

-rwxr-xr-x 1 cloudera cloudera   0 Apr 19 05:45 _SUCCESS

saveAsTextFile

Saves the RDD as text files. One line at a time.

Listing Variants

def saveAsTextFile(path: String)

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec])

Example without compression

val a = sc.parallelize(1 to 10000, 3)

a.saveAsTextFile("mydata_a")

14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a

[cloudera@localhost ~]$ head -n 5 ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/part-00000

1

2

3

4

5

// Produces 3 output files since we have created the a RDD with 3 partitions

[cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/

-rwxr-xr-x 1 cloudera cloudera 15558 Apr  3 21:11 part-00000

-rwxr-xr-x 1 cloudera cloudera 16665 Apr  3 21:11 part-00001

-rwxr-xr-x 1 cloudera cloudera 16671 Apr  3 21:11 part-00002

Example with compression

import org.apache.hadoop.io.compress.GzipCodec

a.saveAsTextFile("mydata_b", classOf[GzipCodec])

[cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_b/

total 24

-rwxr-xr-x 1 cloudera cloudera 7276 Apr  3 21:29 part-00000.gz

-rwxr-xr-x 1 cloudera cloudera 6517 Apr  3 21:29 part-00001.gz

-rwxr-xr-x 1 cloudera cloudera 6525 Apr  3 21:29 part-00002.gz

val x = sc.textFile("mydata_b")

x.count

res2: Long = 10000

Example writing into HDFS

val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)

x.saveAsTextFile("hdfs://localhost:8020/user/cloudera/test");

val sp = sc.textFile("hdfs://localhost:8020/user/cloudera/sp_data")

sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/cloudera/sp_x")

stats [Double]

Simultaneously computes the mean, variance and the standard deviation of all values in the RDD.

Listing Variants

def stats(): StatCounter

Example

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)

x.stats

res16: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859)

sortBy

This function sorts the input RDD's data and stores it in a new RDD. The first parameter requires you to specify a function which  maps the input data into the key that you want to sortBy. The second parameter (optional) specifies whether you want the data to be sorted in ascending or descending order.

Listing Variants

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

Example

val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))

y.sortBy(c => c, true).collect

res101: Array[Int] = Array(1, 1, 2, 3, 5, 7)

y.sortBy(c => c, false).collect

res102: Array[Int] = Array(7, 5, 3, 2, 1, 1)

val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))

z.sortBy(c => c._1, true).collect

res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))

z.sortBy(c => c._2, true).collect

res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))

sortByKey [Ordered]

This function sorts the input RDD's data and stores it in a new RDD. The output RDD is a shuffled RDD because it stores data that is output by a reducer which has been shuffled. The implementation of this function is actually very clever. First, it uses a range partitioner to partition the data in ranges within the shuffled RDD. Then it sorts these ranges individually with mapPartitions using standard sort mechanisms.

Listing Variants

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]

Example

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

val b = sc.parallelize(1 to a.count.toInt, 2)

val c = a.zip(b)

c.sortByKey(true).collect

res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))

c.sortByKey(false).collect

res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)

val b = a.cartesian(a)

val c = sc.parallelize(b.takeSample(true, 5, 13), 2)

val d = c.sortByKey(false)

res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

stdev [Double], sampleStdev [Double]

Calls stats and extracts either stdev-component or corrected sampleStdev-component.

Listing Variants

def stdev(): Double

def sampleStdev(): Double

Example

val d = sc.parallelize(List(0.0, 0.0, 0.0), 3)

d.stdev

res10: Double = 0.0

d.sampleStdev

res11: Double = 0.0

val d = sc.parallelize(List(0.0, 1.0), 3)

d.stdev

d.sampleStdev

res18: Double = 0.5

res19: Double = 0.7071067811865476

val d = sc.parallelize(List(0.0, 0.0, 1.0), 3)

d.stdev

res14: Double = 0.4714045207910317

d.sampleStdev

res15: Double = 0.5773502691896257

subtract

Performs the well known standard set subtraction operation: A - B

Listing Variants

def subtract(other: RDD[T]): RDD[T]

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

def subtract(other: RDD[T], p: Partitioner): RDD[T]

Example

val a = sc.parallelize(1 to 9, 3)

val b = sc.parallelize(1 to 3, 3)

val c = a.subtract(b)

c.collect

res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

subtractByKey [Pair]

Very similar to subtract, but instead of supplying a function, the key-component of each pair will be automatically used as criterion for removing items from the first RDD.

Listing Variants

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)

val b = a.keyBy(_.length)

val c = sc.parallelize(List("ant", "falcon", "squid"), 2)

val d = c.keyBy(_.length)

b.subtractByKey(d).collect

res15: Array[(Int, String)] = Array((4,lion))

sum [Double], sumApprox [Double]

Computes the sum of all values contained in the RDD. The approximate version of the function can finish somewhat faster in some scenarios. However, it trades accuracy for speed.

Listing Variants

def sum(): Double

def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

Example

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)

x.sum

res17: Double = 101.39999999999999

take

Extracts the first n items of the RDD and returns them as an array. (Note: This sounds very easy, but it is actually quite a tricky problem for the implementors of Spark because the items in question can be in many different partitions.)

Listing Variants

def take(num: Int): Array[T]

Example

val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)

b.take(2)

res18: Array[String] = Array(dog, cat)

val b = sc.parallelize(1 to 10000, 5000)

b.take(100)

res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

takeOrdered

Orders the data items of the RDD using their inherent implicit ordering function and returns the first n items as an array.

Listing Variants

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Example

val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)

b.takeOrdered(2)

res19: Array[String] = Array(ape, cat)

takeSample

Behaves different from sample in the following respects:

It will return an exact number of samples (Hint: 2nd parameter)

  It returns an Array instead of RDD.

  It internally randomizes the order of the items returned.

Listing Variants

def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T]

Example

val x = sc.parallelize(1 to 1000, 3)

x.takeSample(true, 100, 1)

res3: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858)

toDebugString

Returns a string that contains debug information about the RDD and its dependencies.

Listing Variants

def toDebugString: String

Example

val a = sc.parallelize(1 to 9, 3)

val b = sc.parallelize(1 to 3, 3)

val c = a.subtract(b)

c.toDebugString

res6: String = 

MappedRDD[15] at subtract at :16 (3 partitions)

SubtractedRDD[14] at subtract at :16 (3 partitions)

MappedRDD[12] at subtract at :16 (3 partitions)

ParallelCollectionRDD[10] at parallelize at :12 (3 partitions)

MappedRDD[13] at subtract at :16 (3 partitions)

ParallelCollectionRDD[11] at parallelize at :12 (3 partitions)

toJavaRDD

Embeds this RDD object within a JavaRDD object and returns it.

Listing Variants

def toJavaRDD() : JavaRDD[T]

Example

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.toJavaRDD

res3: org.apache.spark.api.java.JavaRDD[String] = ParallelCollectionRDD[6] at parallelize at :12

toLocalIterator

Converts the RDD into a scala iterator at the master node.

Listing Variants

def toLocalIterator: Iterator[T]

Example

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

val iter = z.toLocalIterator

iter.next

res51: Int = 1

iter.next

res52: Int = 2

top

Utilizes the implicit ordering of $T$ to determine the top $k$ values and returns them as an array.

Listing Variants

ddef top(num: Int)(implicit ord: Ordering[T]): Array[T]

Example

val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)

c.top(2)

res28: Array[Int] = Array(9, 8)

toString

Assembles a human-readable textual description of the RDD.

Listing Variants

override def toString: String

Example

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

z.toString

res61: String = ParallelCollectionRDD[80] at parallelize at :21

val randRDD = sc.parallelize(List( (7,"cat"), (6, "mouse"),(7, "cup"), (6, "book"), (7, "tv"), (6, "screen"), (7, "heater")))

val sortedRDD = randRDD.sortByKey()

sortedRDD.toString

res64: String = ShuffledRDD[88] at sortByKey at :23

treeAggregate

Computes the same thing as aggregate, except it aggregates the elements of the RDD in a multi-level tree pattern. Another difference is that it does not use the initial value for the second reduce function (combOp).  By default a tree of depth 2 is used, but this can be changed via the depth parameter.

Listing Variants

def treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U

Example

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

// lets first print out the contents of the RDD with partition labels

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

z.mapPartitionsWithIndex(myfunc).collect

res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

z.treeAggregate(0)(math.max(_, _), _ + _)

res40: Int = 9

// Note unlike normal aggregrate. Tree aggregate does not apply the initial value for the second reduce

// This example returns 11 since the initial value is 5

// reduce of partition 0 will be max(5, 1, 2, 3) = 5

// reduce of partition 1 will be max(4, 5, 6) = 6

// final reduce across partitions will be 5 + 6 = 11

// note the final reduce does not include the initial value

z.treeAggregate(5)(math.max(_, _), _ + _)

res42: Int = 11

treeReduce

Works like reduce except reduces the elements of the RDD in a multi-level tree pattern.

Listing Variants

def  treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

Example

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

z.treeReduce(_+_)

res49: Int = 21

union, ++

Performs the standard set operation: A union B

Listing Variants

def ++(other: RDD[T]): RDD[T]

def union(other: RDD[T]): RDD[T]

Example

val a = sc.parallelize(1 to 3, 1)

val b = sc.parallelize(5 to 7, 1)

(a ++ b).collect

res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

unpersist

Dematerializes the RDD (i.e. Erases all data items from hard-disk and memory). However, the RDD object remains. If it is referenced in a computation, Spark will regenerate it automatically using the stored dependency graph.

Listing Variants

def unpersist(blocking: Boolean = true): RDD[T]

Example

val y = sc.parallelize(1 to 10, 10)

val z = (y++y)

z.collect

z.unpersist(true)

14/04/19 03:04:57 INFO UnionRDD: Removing RDD 22 from persistence list

14/04/19 03:04:57 INFO BlockManager: Removing RDD 22

values

Extracts the values from all contained tuples and returns them in a new RDD.

Listing Variants

def values: RDD[V]

Example

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val b = a.map(x => (x.length, x))

b.values.collect

res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

variance [Double], sampleVariance [Double]

Calls stats and extracts either variance-component or corrected sampleVariance-component.

Listing Variants

def variance(): Double

def sampleVariance(): Double

Example

val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)

a.variance

res70: Double = 10.605333333333332

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)

x.variance

res14: Double = 66.04584444444443

x.sampleVariance

res13: Double = 74.30157499999999

zip

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-value pairs by the methods provided by the PairRDDFunctions extension.

Listing Variants

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

Example

val a = sc.parallelize(1 to 100, 3)

val b = sc.parallelize(101 to 200, 3)

a.zip(b).collect

res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,...

val a = sc.parallelize(1 to 100, 3)

val b = sc.parallelize(101 to 200, 3)

val c = sc.parallelize(201 to 300, 3)

a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect

res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)...

zipParititions

Similar to zip. But provides more control over the zipping process.

Listing Variants

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

Example

val a = sc.parallelize(0 to 9, 3)

val b = sc.parallelize(10 to 19, 3)

val c = sc.parallelize(100 to 109, 3)

def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =

{

var res = List[String]()

while (aiter.hasNext && biter.hasNext && citer.hasNext)

{

val x = aiter.next + " " + biter.next + " " + citer.next

res ::= x

}

res.iterator

}

a.zipPartitions(b, c)(myfunc).collect

res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)

zipWithIndex

Zips the elements of the RDD with its element indexes. The indexes start from 0. If the RDD is spread across multiple partitions then a spark Job is started to perform this operation.

Listing Variants

def zipWithIndex(): RDD[(T, Long)]

Example

val z = sc.parallelize(Array("A", "B", "C", "D"))

val r = z.zipWithIndex

res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))

val z = sc.parallelize(100 to 120, 5)

val r = z.zipWithIndex

r.collect

res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20))

zipWithUniqueId

This is different from zipWithIndex since just gives a unique id to each data element but the ids may not match the index number of the data element. This operation does not start a spark job even if the RDD is spread across multiple partitions.

Compare the results of the example below with that of the 2nd example of zipWithIndex. You should be able to see the difference.

Listing Variants

def zipWithUniqueId(): RDD[(T, Long)]

Example

val z = sc.parallelize(100 to 120, 5)

val r = z.zipWithUniqueId

r.collect

res12: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24))

hit counter website

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,311评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,339评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,671评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,252评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,253评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,031评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,340评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,973评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,466评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,937评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,039评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,701评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,254评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,259评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,497评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,786评论 2 345