Transformations(转换)
下表列出了一些 Spark 常用的 transformations(转换)。详情请参考 RDD API 文档(Scala,Java,Python,R)和 pair RDD 函数文档(Scala,Java)。
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。 |
filter(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。 |
flatMap(func) | 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item)。 |
mapPartitions(func) | 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型。 |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。 |
sample(withReplacement, fraction, seed) | 样本数据,设置是否放回(withReplacement),采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。 |
union(otherDataset) | 反回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。 |
intersection(otherDataset) | 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。 |
distinct([numTasks])) | 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。 |
groupByKey([numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) . |
Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好. |
|
Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。 |
|
reduceByKey(func, [numTasks]) | 在 (K, V) pairs 的 dataset 上调用时,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的函数 func来进行聚合的,它必须是 type (V,V) => V 的类型。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 在 (K, V) pairs 的 dataset 上调用时,返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的。允许聚合值的类型与输入值的类型不一样,同时避免不必要的配置。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。 |
sortByKey([ascending], [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset,由 boolean 类型的 ascending 参数来指定。 |
join(otherDataset, [numTasks]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin , rightOuterJoin 和 fullOuterJoin 来实现。 |
cogroup(otherDataset, [numTasks]) | 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。这个操作也调用了 groupWith 。 |
cartesian(otherDataset) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。 |
pipe(command, [envVars]) | 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。 |
coalesce(numPartitions) | Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。 |
repartition(numPartitions) | Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。 |
Actions(动作)
下表列出了一些 Spark 常用的 actions 操作。详细请参考 RDD API 文档(Scala,Java,Python,R)
Action(动作) | Meaning(含义) |
---|---|
reduce(func) | 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative)和关联(associative)的,这样才能保证它可以被并行地正确计算。 |
collect() | 在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。 |
count() | 返回 dataset 中元素的个数。 |
first() | 返回 dataset 中的第一个元素(类似于 take(1)。 |
take(n) | 将数据集中的前 n 个元素作为一个 array 数组返回。 |
takeSample(withReplacement, num, [seed]) | 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。 |
takeOrdered(n, [ordering]) | 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。 |
saveAsTextFile(path) | 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。 |
saveAsSequenceFile(path) | |
(Java and Scala) | 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int,Double,String 等等)。 |
saveAsObjectFile(path) | |
(Java and Scala) | 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。 |
countByKey() | 仅适用于(K,V)类型的 RDD。返回具有每个 key 的计数的(K , Int)pairs 的 hashmap。 |
foreach(func) | 对 dataset 中每个元素运行函数 func。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach() 之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分。 |
该 Spark RDD API 还暴露了一些 actions(操作)的异步版本,例如针对 foreach
的 foreachAsync
,它们会立即返回一个FutureAction
到调用者,而不是在完成 action 时阻塞。这可以用于管理或等待 action 的异步执行。.