pySpark 中文API (1)

http://spark.apache.org/docs/latest/api/python/index.html

pyspark软件包

子包

pyspark.sql模块

pyspark.streaming模块

pyspark.ml包

pyspark.mllib包

内容

PySpark是Spark的Python API。

公开课:

SparkContext

Spark功能的主要入口点。

RDD

弹性分布式数据集(RDD),Spark中的基本抽象。

Broadcast

一个广播变量,可以在任务之间重用。

Accumulator

任务只能添加值的“只添加”共享变量。

SparkConf

用于配置Spark。

SparkFiles

访问作业附带的文件。

StorageLevel

更细粒度的缓存持久性级别。

TaskContext

关于当前正在运行的任务的信息,可在工人和实验室中获得。

class pyspark.SparkConf(loadDefaults = True_jvm = None_jconf = None [source]

Spark应用程序的配置。用于将各种Spark参数设置为键值对。

大多数情况下,您将创建一个SparkConf对象SparkConf(),该对象 将从spark。* Java系统属性中加载值。在这种情况下,您直接在SparkConf对象上设置的任何参数都优先于系统属性。

对于单元测试,您也可以调用SparkConf(false)跳过加载外部设置并获取相同的配置,而不管系统属性如何。

这个类中的所有setter方法都支持链接。例如,您可以编写conf.setMaster(“local”)。setAppName(“My app”)。

注意


一旦将SparkConf对象传递给Spark,它就会被克隆,并且不能再由用户修改。

contains(key [source]

此配置是否包含给定的密钥?

get(keydefaultValue = None [source]

获取某个键的配置值,否则返回默认值。

getAll()[source]

获取所有值作为键值对的列表。

set(keyvalue [source]

设置配置属性。

setAll([来源]

设置多个参数,作为键值对列表传递。

参数: - 要设置的键值列表

setAppName(价值[来源]

设置应用程序名称

setExecutorEnv(key = Nonevalue = Nonepairs = None [source]

设置要传递给执行者的环境变量。

setIfMissing(keyvalue [source]

设置配置属性(如果尚未设置)。

setMaster(价值[来源]

设置要连接的主要URL。

setSparkHome(价值[来源]

设置工作站节点上安装Spark的路径。

toDebugString()[source]

以键=值对的列表形式返回配置的可打印版本,每行一个。

pyspark.SparkContext(master = NoneappName = NonesparkHome = NonepyFiles = Noneenvironment = NonebatchSize = 0serializer = PickleSerializer()conf = Nonegateway = Nonejsc = Noneprofiler_cls =  [source]

Spark功能的主要入口点。SparkContext表示与Spark集群的连接,并可用于RDD在该集群上创建和广播变量。

PACKAGE_EXTENSIONS=('.zip','.egg','.jar')

accumulator(valueaccum_param = None [source]

创建一个Accumulator给定的初始值,使用给定的 AccumulatorParam帮助对象来定义如何提供数据类型的值。如果您不提供一个,则默认AccumulatorParams用于整数和浮点数。对于其他类型,可以使用自定义的AccumulatorParam。

addFile(pathrecursive = False [source]

在每个节点上使用此Spark作业添加要下载的文件。该path传递可以是本地文件,在HDFS(或其他Hadoop的支持的文件系统)的文件,或HTTP,HTTPS或FTP URI。

要访问Spark作业中的文件,请使用文件名L {SparkFiles.get(fileName)}来查找其下载位置。

如果递归选项设置为True,则可以提供一个目录。当前目录仅支持Hadoop支持的文件系统。

>>> from pyspark import SparkFiles >>> path = os 。路径。加入(tempdir ,“test.txt” )>>> with open (path ,“w” )as testFile :... _ = testFile 。写(“100” )>>> sc 。addFile (路径)>>> DEF FUNC (迭代): 。与开放式(SparkFiles 。获得(“的test.txt” ))为TESTFILE :... fileVal = INT (TESTFILE 。的ReadLine ())... 返回[ X * fileVal 为X 在迭代器] >>> SC 。并行化([ 1 ,2 ,3 ,4 ]) 。mapPartitions (func )。搜集()[100,200,300,400]

addPyFile(path [source]

为将来在此SparkContext上执行的所有任务添加.py或.zip依赖项。该path传递可以是本地文件,在HDFS(或其他Hadoop的支持的文件系统)的文件,或HTTP,HTTPS或FTP URI。

applicationId

Spark应用程序的唯一标识符。其格式取决于调度程序的实现。

在本地火花应用程序的情况下,如“本地-1433865536131”

在YARN的情况下,像'application_1433865536131_34483'

>>> sc 。applicationId u'local -...'

binaryFiles(pathminPartitions = None [source]

注意


试验

从HDFS,本地文件系统(所有节点上都可用)或任何Hadoop支持的文件系统URI中读取二进制文件的目录作为字节数组。每个文件都被读取为单个记录并以键值对返回,其中键是每个文件的路径,该值是每个文件的内容。

注意


小文件是首选,大文件也是允许的,但可能会导致性能不佳。

binaryRecords(pathrecordLength [source]

注意


试验

如果每个记录都是一组具有指定数字格式的数字(请参阅ByteBuffer),并且每条记录的字节数是恒定的,则从平面二进制文件加载数据。

参数:路径 - 目录到输入数据文件

recordLength - 分割记录的长度

broadcast(价值[来源]

向群集广播一个只读变量,返回一个L {Broadcast }对象以便在分布式函数中读取它。该变量只会发送到每个群集一次。

cancelAllJobs()[source]

取消所有已安排或正在运行的作业。

cancelJobGroup(groupId [source]

取消指定组的活动作业。查看SparkContext.setJobGroup 更多信息。

defaultMinPartitions

当用户未给出Hadoop RDD的默认分区数时

defaultParallelism

当用户没有给出默认的并行度水平时(例如,为了减少任务)

dump_profiles(path [source]

将配置文件统计信息转储到目录路径中

emptyRDD()[source]

创建一个没有分区或元素的RDD。

getConf()[source]

getLocalProperty(key [source]

获取此线程中设置的本地属性,如果缺失则返回null。看到 setLocalProperty

classmethod getOrCreate(conf = None [source]

获取或实例化一个SparkContext并将其注册为一个单例对象。

参数:conf - SparkConf(可选)

hadoopFile(pathinputFormatClasskeyClassvalueClasskeyConverter = NonevalueConverter = Noneconf = NonebatchSize = 0 [source]

使用HDFS中的任意键和值类,本地文件系统(所有节点上都可用)或任何Hadoop支持的文件系统URI读取“旧”Hadoop InputFormat。该机制与sc.sequenceFile相同。

Hadoop配置可以作为Python字典传入。这将转换为Java中的配置。

参数:路径 - Hadoop文件的路径

inputFormatClass - Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapred.TextInputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.Text”)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)

keyConverter - (默认无)

valueConverter - (默认无)

conf - Hadoop配置,以字典形式传入(默认为无)

batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,自动选择batchSize)

hadoopRDD(inputFormatClasskeyClassvalueClasskeyConverter = NonevalueConverter = Noneconf = NonebatchSize = 0 [source]

从任意Hadoop配置中读取具有任意键和值类的“旧”Hadoop InputFormat,该配置以Python词典形式传入。这将转换为Java中的配置。该机制与sc.sequenceFile相同。

参数:inputFormatClass - Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapred.TextInputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.Text”)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)

keyConverter - (默认无)

valueConverter - (默认无)

conf - Hadoop配置,以字典形式传入(默认为无)

batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,自动选择batchSize)

newAPIHadoopFile(pathinputFormatClasskeyClassvalueClasskeyConverter = NonevalueConverter = Noneconf = NonebatchSize = 0 [source]

使用HDFS中的任意键和值类,本地文件系统(所有节点都可用)或任何Hadoop支持的文件系统URI读取“新API”Hadoop InputFormat。该机制与sc.sequenceFile相同。

Hadoop配置可以作为Python字典传入。这将转换为Java中的配置

参数:路径 - Hadoop文件的路径

inputFormatClass - Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.Text”)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)

keyConverter - (默认无)

valueConverter - (默认无)

conf - Hadoop配置,以字典形式传入(默认为无)

batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,自动选择batchSize)

newAPIHadoopRDD(inputFormatClasskeyClassvalueClasskeyConverter = NonevalueConverter = Noneconf = NonebatchSize = 0 [source]

从任意Hadoop配置中读取具有任意键和值类的'新API'Hadoop InputFormat,该配置作为Python字典传入。这将转换为Java中的配置。该机制与sc.sequenceFile相同。

参数:inputFormatClass - Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.Text”)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)

keyConverter - (默认无)

valueConverter - (默认无)

conf - Hadoop配置,以字典形式传入(默认为无)

batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,自动选择batchSize)

parallelize(cnumSlices = None [source]

分发本地Python集合以形成RDD。如果输入表示性能范围,则建议使用xrange。

>>> sc 。并行化([ 0 ,2 ,3 ,4 ,6 ],5 )。glom ()。collect ()[[0],[2],[3],[4],[6]] >>> sc 。并行化(x范围(0 ,6 ,2 ),5 )。glom ()。collect ()[[],[0],[],[2],[4]]

pickleFile(nameminPartitions = None [source]

加载先前使用RDD.saveAsPickleFile方法保存的RDD 。

>>> tmpFile = NamedTemporaryFile (delete = True )>>> tmpFile 。close ()>>> sc 。并行化(范围(10 ))。saveAsPickleFile (TMPFILE 。名,5 )>>> 排序(SC 。pickleFile (TMPFILE 。名称,3 )。收集())[0,1,2,3,4,5,6,7,8,9]

range(startend = Nonestep = 1numSlices = None [source]

创建一个包含从开始到结束 (独占)元素的int的新RDD,逐步增加每个元素。可以像python内置的range()函数一样调用。如果使用单个参数调用,参数被解释为结束,并且start被设置为0。

参数:开始 - 起始值

结束 - 最终值(独占)

 - 增量步(默认值:1)

numSlices - 新RDD的分区数量

返回:int的RDD

>>> sc 。范围(5 )。collect ()[0,1,2,3,4] >>> sc 。范围(2 ,4 )。collect ()[2,3] >>> sc 。范围(1 ,7 ,2 )。collect ()[1,3,5]

runJob(rddpartitionFuncpartitions = NoneallowLocal = False [source]

在指定的一组分区上执行给定的partitionFunc,并将结果作为一组元素返回。

如果未指定“分区”,则会在所有分区上运行。

>>> myRDD = sc 。并行化(范围(6 ),3 )>>> sc 。runJob (myRDD ,拉姆达部分:[ X * X 为X 在部分])[0,1,4,9,16,25]

>>> myRDD = sc 。并行化(范围(6 ),3 )>>> sc 。runJob (myRDD ,拉姆达部分:[ X * X 为X 在部分],[ 0 ,2 ],真)[0,1,16,25]

sequenceFile(pathkeyClass = NonevalueClass = NonekeyConverter = NonevalueConverter = NoneminSplits = NonebatchSize = 0 [source]

用任意键和值读取Hadoop SequenceFile HDFS的可写类,本地文件系统(所有节点都可用)或任何Hadoop支持的文件系统URI。机制如下:

Java RDD由SequenceFile或其他InputFormat以及键和值Writable类创建

序列化试图通过Pyrolite酸洗

如果失败,则回退是对每个键和值调用“toString”

PickleSerializer 用于反序列化Python端的pickle对象

参数:路径 - sequncefile的路径

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.Text”)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)

keyConverter -

valueConverter -

minSplits - 数据集中的最小分割(默认min(2,sc.defaultParallelism))

batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,自动选择batchSize)

setCheckpointDir(dirName [source]

设置RDD将被检查点的目录。如果在群集上运行,该目录必须是HDFS路径。

setJobDescription(价值[来源]

设置当前作业的人类可读描述。

setJobGroup(groupIddescriptioninterruptOnCancel = False [source]

为此线程启动的所有作业分配一个组ID,直到组ID被设置为不同的值或清除。

通常,应用程序中的执行单元由多个Spark操作或作业组成。应用程序员可以使用此方法将所有这些作业分组在一起并给出组描述。一旦设置,Spark Web UI将把这些作业与这个组关联起来。

该应用程序可以SparkContext.cancelJobGroup用来取消该组中的所有正在运行的作业。

>>> import threading >>> from time import sleep >>> result = “Not Set” >>> lock = 线程。Lock ()>>> def map_func (x ):... sleep (100 )... raise Exception (“Task should be cancelled” )>>> def start_job (x ):... 全局结果... 尝试:sc 。setJobGroup (“job_to_cancel” ,“some description” )... result = sc 。并行化(范围(x ))。地图(map_func )。收集()... 除例外为ë :... 结果= “取消” ... 锁。release ()>>> def stop_job ():... sleep (5)... sc 。cancelJobGroup (“job_to_cancel” )>>> 剿= 锁。acquire ()>>> supress = 线程。线程(target = start_job ,args = (10 ,))。start ()>>> supress = 线程。线程(target = stop_job )。start ()>>> supress =锁定。acquire ()>>> print (result )已取消

如果作业组的interruptOnCancel设置为true,则作业取消将导致Thread.interrupt()在作业的执行程序线程上被调用。这有助于确保实时地停止任务,但由于HDFS-1208的原因,默认情况下会关闭,HDFS可能会通过将节点标记为无效来响应Thread.interrupt()。

setLocalProperty(keyvalue [source]

设置影响从此线程提交的作业的本地属性,例如Spark Fair Scheduler池。

setLogLevel(logLevel [source]

控制我们的logLevel。这会覆盖任何用户定义的日志设置。有效的日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN

classmethod setSystemProperty(keyvalue [source]

设置Java系统属性,如spark.executor.memory。这必须在实例化SparkContext之前调用。

show_profiles()[source]

将配置文件统计信息打印到标准输出

sparkUser()[source]

为运行SparkContext的用户获取SPARK_USER。

startTime

Spark Context开始时返回纪元时间。

statusTracker()[source]

返回StatusTracker对象

stop()[source]

关闭SparkContext。

textFile(nameminPartitions = Noneuse_unicode = True [source]

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()[u'Hello world!']

uiWebUrl

Return the URL of the SparkUI instance started by this SparkContext

union(rdds)[source]

Build the union of a list of RDDs.

This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:

>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()[u'Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())[u'Hello', 'World!']

version

The version of Spark on which this application is running.

wholeTextFiles(pathminPartitions=Noneuse_unicode=True)[source]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

For example, if you have the following files:

hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn

Do rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then rdd contains:

(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)

Note


Small files are preferred, as each file will be loaded fully in memory.

>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

class pyspark.SparkFiles[source]

Resolves paths to files added through L{SparkContext.addFile()}.

SparkFiles contains only classmethods; users should not create SparkFiles instances.

classmethod get(filename)[source]

Get the absolute path of a file added through SparkContext.addFile().

classmethod getRootDirectory()[source]

Get the root directory that contains files added through SparkContext.addFile().

class pyspark.RDD(jrddctxjrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))[source]

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

aggregate(zeroValueseqOpcombOp)[source]

Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”

The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

>>> seqOp=(lambdax,y:(x[0]+y,x[1]+1))>>> combOp=(lambdax,y:(x[0]+y[0],x[1]+y[1]))>>> sc.parallelize([1,2,3,4]).aggregate((0,0),seqOp,combOp)(10, 4)>>> sc.parallelize([]).aggregate((0,0),seqOp,combOp)(0, 0)

aggregateByKey(zeroValueseqFunccombFuncnumPartitions=NonepartitionFunc=)[source]

Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

cache()[source]

Persist this RDD with the default storage level (MEMORY_ONLY).

cartesian(other)[source]

Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other.

>>> rdd=sc.parallelize([1,2])>>> sorted(rdd.cartesian(rdd).collect())[(1, 1), (1, 2), (2, 1), (2, 2)]

checkpoint()[source]

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

coalesce(numPartitionsshuffle=False)[source]

Return a new RDD that is reduced into numPartitions partitions.

>>> sc.parallelize([1,2,3,4,5],3).glom().collect()[[1], [2, 3], [4, 5]]>>> sc.parallelize([1,2,3,4,5],3).coalesce(1).glom().collect()[[1, 2, 3, 4, 5]]

cogroup(othernumPartitions=None)[source]

For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> [(x,tuple(map(list,y)))forx,yinsorted(list(x.cogroup(y).collect ()))] [('a',([1],[2])),('b',([4],[]))]

collect()[source]

返回包含此RDD中所有元素的列表。

注意


因为所有的数据都被加载到驱动程序的内存中,所以只有当结果数组很小时才应使用该方法。

collectAsMap()[source]

将此RDD中的键值对作为字典返回给主数据库。

注意


因为所有的数据都加载到驱动程序的内存中,所以只有在结果数据很小时才应使用此方法。

>>> m = sc 。并行化([(1 ,2 ), (3 ,4 )]) 。collectAsMap ()>>> m [ 1 ] 2 >>> m [ 3 ] 4

combineByKey(createCombinermergeValuemergeCombinersnumPartitions = NonepartitionFunc =  [source]

通用函数使用自定义集合函数集合每个键的元素。

对于“组合类型”C,将RDD [(K,V)]转换为RDD [(K,C)]类型的结果。

用户提供三种功能:

createCombiner,它将V变成C(例如,创建一个元素列表)

mergeValue,将V合并到C中(例如,将其添加到列表的末尾)

mergeCombiners,将两个C合并为一个C(例如合并列表)

为了避免内存分配,mergeValue和mergeCombiners都允许修改并返回它们的第一个参数,而不是创建一个新的C.

另外,用户可以控制输出RDD的分区。

注意


V和C可以不同 - 例如,可以将类型(Int,Int)的RDD分组为类型(Int,List [Int])的RDD。

>>> x = sc 。parallelize ([(“a” ,1 ),(“b” ,1 ),(“a” ,2 )])>>> def to_list (a ):... return [ a ] ... >>> def append (a ,b ):... a 。追加(b )... 返回一个... >>> def extend(a ,b ):... a 。延伸(b )... 返回一个... >>> 排序(X 。combineByKey (to_list ,追加,延伸)。收集())[( 'A',[1,2]),( 'B', [1])]

context

SparkContextRDD创建于此。

count()[source]

返回此RDD中的元素数量。

>>> sc.parallelize([2,3,4]).count()3

countApprox(timeoutconfidence=0.95)[source]

Note


Experimental

Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.

>>> rdd=sc.parallelize(range(1000),10)>>> rdd.countApprox(1000,1.0)1000

countApproxDistinct(relativeSD=0.05)[source]

Note


Experimental

Return approximate number of distinct elements in the RDD.

The algorithm used is based on streamlib’s implementation of “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.

Parameters:relativeSD – Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017.

>>> n=sc.parallelize(range(1000)).map(str).countApproxDistinct()>>> 900>> n=sc.parallelize([i%20foriinrange(1000)]).countApproxDistinct()>>> 16

countByKey()[source]

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.countByKey().items())[('a', 2), ('b', 1)]

countByValue()[source]

Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

>>> sorted(sc.parallelize([1,2,1,2,2],2).countByValue().items())[(1, 2), (2, 3)]

distinct(numPartitions=None)[source]

Return a new RDD containing the distinct elements in this RDD.

>>> sorted(sc.parallelize([1,1,2,3]).distinct().collect())[1, 2, 3]

filter(f)[source]

Return a new RDD containing only the elements that satisfy a predicate.

>>> rdd=sc.parallelize([1,2,3,4,5])>>> rdd.filter(lambdax:x%2==0).collect()[2, 4]

first()[source]

Return the first element in this RDD.

>>> sc.parallelize([2,3,4]).first()2>>> sc.parallelize([]).first()Traceback (most recent call last):...ValueError:RDD is empty

flatMap(fpreservesPartitioning=False)[source]

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

>>> rdd=sc.parallelize([2,3,4])>>> sorted(rdd.flatMap(lambdax:range(1,x)).collect())[1, 1, 1, 2, 2, 3]>>> sorted(rdd.flatMap(lambdax:[(x,x),(x,x)]).collect())[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

flatMapValues(f)[source]

Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning.

>>> x=sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])>>> deff(x):returnx>>> x.flatMapValues(f).collect()[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

fold(zeroValueop)[source]

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.”

The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

>>> fromoperatorimportadd>>> sc.parallelize([1,2,3,4,5]).fold(0,add)15

foldByKey(zeroValuefuncnumPartitions=NonepartitionFunc=)[source]

Merge the values for each key using an associative function “func” and a neutral “zeroValue” which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).

>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> fromoperatorimportadd>>> sorted(rdd.foldByKey(0,add).collect())[('a', 2), ('b', 1)]

foreach(f)[source]

Applies a function to all elements of this RDD.

>>> deff(x):print(x)>>> sc.parallelize([1,2,3,4,5]).foreach(f)

foreachPartition(f)[source]

Applies a function to each partition of this RDD.

>>> deff(iterator):... forxiniterator:... print(x)>>> sc.parallelize([1,2,3,4,5]).foreachPartition(f)

fullOuterJoin(othernumPartitions=None)[source]

Perform a right outer join of self and other.

For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in otherhave key k.

Similarly, for each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in self, or the pair (k, (None, w)) if no elements in self have key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2),("c",8)])>>> sorted(x.fullOuterJoin(y).collect())[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]

getCheckpointFile()[source]

Gets the name of the file to which this RDD was checkpointed

Not defined if RDD is checkpointed locally.

getNumPartitions()[source]

Returns the number of partitions in RDD

>>> rdd=sc.parallelize([1,2,3,4],2)>>> rdd.getNumPartitions()2

getStorageLevel()[source]

Get the RDD’s current storage level.

>>> rdd1=sc.parallelize([1,2])>>> rdd1.getStorageLevel()StorageLevel(False, False, False, False, 1)>>> print(rdd1.getStorageLevel())Serialized 1x Replicated

glom()[source]

Return an RDD created by coalescing all elements within each partition into a list.

>>> rdd=sc.parallelize([1,2,3,4],2)>>> sorted(rdd.glom().collect())[[1, 2], [3, 4]]

groupBy(fnumPartitions=NonepartitionFunc=)[source]

Return an RDD of grouped items.

>>> rdd=sc.parallelize([1,1,2,3,5,8])>>> result=rdd.groupBy(lambdax:x%2).collect()>>> sorted([(x,sorted(y))for(x,y)inresult])[(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey(numPartitions=NonepartitionFunc=)[source]

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.

Note


If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)]>>> sorted(rdd.groupByKey().mapValues(list).collect())[('a', [1, 1]), ('b', [1])]

groupWith(other*others)[source]

Alias for cogroup but with support for multiple RDDs.

>>> w=sc.parallelize([("a",5),("b",6)])>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> z=sc.parallelize([("b",42)])>>> [(x,tuple(map(list,y)))forx,yinsorted(list(w.groupWith(x,y,z).collect()))][('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

histogram(buckets)[source]

Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.

If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element (where n is the number of buckets).

Buckets must be sorted, not contain any duplicates, and have at least two elements.

If buckets is a number, it will generate buckets which are evenly spaced between the minimum and maximum of the RDD. For example, if the min value is 0 and the max is 100, given buckets as 2, the resulting buckets will be [0,50) [50,100]. buckets must be at least 1. An exception is raised if the RDD contains infinity. If the elements in the RDD do not vary (max == min), a single bucket will be used.

The return value is a tuple of buckets and histogram.

>>> rdd=sc.parallelize(range(51))>>> rdd.histogram(2)([0, 25, 50], [25, 26])>>> rdd.histogram([0,5,25,50])([0, 5, 25, 50], [5, 20, 26])>>> rdd.histogram([0,15,30,45,60])# evenly spaced buckets([0, 15, 30, 45, 60], [15, 15, 15, 6])>>> rdd=sc.parallelize(["ab","ac","b","bd","ef"])>>> rdd.histogram(("a","b","c"))(('a', 'b', 'c'), [2, 2])

id()[source]

A unique ID for this RDD (within its SparkContext).

intersection(other)[source]

Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

Note


This method performs a shuffle internally.

>>> rdd1=sc.parallelize([1,10,2,3,4,5])>>> rdd2=sc.parallelize([1,6,2,3,7,8])>>> rdd1.intersection(rdd2).collect()[1, 2, 3]

isCheckpointed()[source]

Return whether this RDD is checkpointed and materialized, either reliably or locally.

isEmpty()[source]

Returns true if and only if the RDD contains no elements at all.

Note


an RDD may be empty even when it has at least 1 partition.

>>> sc.parallelize([]).isEmpty()True>>> sc.parallelize([1]).isEmpty()False

isLocallyCheckpointed()[source]

Return whether this RDD is marked for local checkpointing.

Exposed for testing.

join(othernumPartitions=None)[source]

Return an RDD containing all pairs of elements with matching keys in self and other.

Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.

Performs a hash join across the cluster.

>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2),("a",3)])>>> sorted(x.join(y).collect())[('a', (1, 2)), ('a', (1, 3))]

keyBy(f)[source]

Creates tuples of the elements in this RDD by applying f.

>>> x=sc.parallelize(range(0,3)).keyBy(lambdax:x*x)>>> y=sc.parallelize(zip(range(0,5),range(0,5)))>>> [(x,list(map(list,y)))forx,yinsorted(x.cogroup(y).collect())][(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]

keys()[source]

Return an RDD with the keys of each tuple.

>>> m=sc.parallelize([(1,2),(3,4)]).keys()>>> m.collect()[1, 3]

leftOuterJoin(othernumPartitions=None)[source]

Perform a left outer join of self and other.

For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in otherhave key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> sorted(x.leftOuterJoin(y).collect())[('a', (1, 2)), ('b', (4, None))]

localCheckpoint()[source]

Mark this RDD for local checkpointing using Spark’s existing caching layer.

This method is for users who wish to truncate RDD lineages while skipping the expensive step of replicating the materialized data in a reliable distributed file system. This is useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).

Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed data is written to ephemeral local storage in the executors instead of to a reliable, fault-tolerant storage. The effect is that if an executor fails during the computation, the checkpointed data may no longer be accessible, causing an irrecoverable job failure.

This is NOT safe to use with dynamic allocation, which removes executors along with their cached blocks. If you must use both features, you are advised to set spark.dynamicAllocation.cachedExecutorIdleTimeout to a high value.

The checkpoint directory set through SparkContext.setCheckpointDir() is not used.

lookup(key)[source]

Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

>>> l=range(1000)>>> rdd=sc.parallelize(zip(l,l),10)>>> rdd.lookup(42)# slow[42]>>> sorted=rdd.sortByKey()>>> sorted.lookup(42)# fast[42]>>> sorted.lookup(1024)[]>>> rdd2=sc.parallelize([(('a','b'),'c')]).groupByKey()>>> list(rdd2.lookup(('a','b'))[0])['c']

map(fpreservesPartitioning=False)[source]

Return a new RDD by applying a function to each element of this RDD.

>>> rdd=sc.parallelize(["b","a","c"])>>> sorted(rdd.map(lambdax:(x,1)).collect())[('a', 1), ('b', 1), ('c', 1)]

mapPartitions(fpreservesPartitioning=False)[source]

Return a new RDD by applying a function to each partition of this RDD.

>>> rdd=sc.parallelize([1,2,3,4],2)>>> deff(iterator):yieldsum(iterator)>>> rdd.mapPartitions(f).collect()[3, 7]

mapPartitionsWithIndex(fpreservesPartitioning=False)[source]

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

>>> rdd=sc.parallelize([1,2,3,4],4)>>> deff(splitIndex,iterator):yieldsplitIndex>>> rdd.mapPartitionsWithIndex(f).sum()6

mapPartitionsWithSplit(fpreservesPartitioning=False)[source]

Deprecated: use mapPartitionsWithIndex instead.

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

>>> rdd=sc.parallelize([1,2,3,4],4)>>> deff(splitIndex,iterator):yieldsplitIndex>>> rdd.mapPartitionsWithSplit(f).sum()6

mapValues(f)[source]

Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

>>> x=sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])])>>> deff(x):returnlen(x)>>> x.mapValues(f).collect()[('a', 3), ('b', 1)]

max(key=None)[source]

Find the maximum item in this RDD.

Parameters:key – A function used to generate key for comparing

>>> rdd=sc.parallelize([1.0,5.0,43.0,10.0])>>> rdd.max()43.0>>> rdd.max(key=str)5.0

mean()[source]

Compute the mean of this RDD’s elements.

>>> sc.parallelize([1,2,3]).mean()2.0

meanApprox(timeoutconfidence=0.95)[source]

Note


Experimental

Approximate operation to return the mean within a timeout or meet the confidence.

>>> rdd=sc.parallelize(range(1000),10)>>> r=sum(range(1000))/1000.0>>> abs(rdd.meanApprox(1000)-r)/r<0.05True

min(key=None)[source]

Find the minimum item in this RDD.

Parameters:key – A function used to generate key for comparing

>>> rdd=sc.parallelize([2.0,5.0,43.0,10.0])>>> rdd.min()2.0>>> rdd.min(key=str)10.0

name()[source]

Return the name of this RDD.

partitionBy(numPartitionspartitionFunc=)[source]

Return a copy of the RDD partitioned using the specified partitioner.

>>> pairs=sc.parallelize([1,2,3,4,2,4,1]).map(lambdax:(x,x))>>> sets=pairs.partitionBy(2).glom().collect()>>> len(set(sets[0]).intersection(set(sets[1])))0

persist(storageLevel=StorageLevel(FalseTrueFalseFalse1))[source]

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

>>> rdd=sc.parallelize(["b","a","c"])>>> rdd.persist().is_cachedTrue

pipe(commandenv=NonecheckCode=False)[source]

Return an RDD created by piping elements to a forked external process.

>>> sc.parallelize(['1','2','','3']).pipe('cat').collect()[u'1', u'2', u'', u'3']

Parameters:checkCode – whether or not to check the return value of the shell command.

randomSplit(weightsseed=None)[source]

Randomly splits this RDD with the provided weights.

Parameters:weights – weights for splits, will be normalized if they don’t sum to 1

seed – random seed

Returns:split RDDs in a list

>>> rdd=sc.parallelize(range(500),1)>>> rdd1,rdd2=rdd.randomSplit([2,3],17)>>> len(rdd1.collect()+rdd2.collect())500>>> 150>> 250

reduce(f)[source]

Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

>>> fromoperatorimportadd>>> sc.parallelize([1,2,3,4,5]).reduce(add)15>>> sc.parallelize((2for_inrange(10))).map(lambdax:1).cache().reduce(add)10>>> sc.parallelize([]).reduce(add)Traceback (most recent call last):...ValueError:Can not reduce() empty RDD

reduceByKey(funcnumPartitions=NonepartitionFunc=)[source]

Merge the values for each key using an associative and commutative reduce function.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

>>> fromoperatorimportadd>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.reduceByKey(add).collect())[('a', 2), ('b', 1)]

reduceByKeyLocally(func)[source]

Merge the values for each key using an associative and commutative reduce function, but return the results immediately to the master as a dictionary.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

>>> fromoperatorimportadd>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.reduceByKeyLocally(add).items())[('a', 2), ('b', 1)]

repartition(numPartitions)[source]

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

>>> rdd=sc.parallelize([1,2,3,4,5,6,7],4)>>> sorted(rdd.glom().collect())[[1], [2, 3], [4, 5], [6, 7]]>>> len(rdd.repartition(2).glom().collect())2>>> len(rdd.repartition(10).glom().collect())10

repartitionAndSortWithinPartitions(numPartitions=NonepartitionFunc=ascending=Truekeyfunc=>)[source]

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

>>> rdd=sc.parallelize([(0,5),(3,8),(2,6),(0,8),(3,8),(1,3)])>>> rdd2=rdd.repartitionAndSortWithinPartitions(2,lambdax:x%2,True)>>> rdd2.glom().collect()[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]

rightOuterJoin(othernumPartitions=None)[source]

Perform a right outer join of self and other.

For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) if no elements in selfhave key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> sorted(y.rightOuterJoin(x).collect())[('a', (2, 1)), ('b', (None, 4))]

sample(withReplacementfractionseed=None)[source]

Return a sampled subset of this RDD.

Parameters:withReplacement – can elements be sampled multiple times (replaced when sampled out)

fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

seed – seed for the random number generator

Note


This is not guaranteed to provide exactly the fraction specified of the total count of the given DataFrame.

>>> rdd=sc.parallelize(range(100),4)>>> 6<=rdd.sample(False,0.1,81).count()<=14True

sampleByKey(withReplacementfractionsseed=None)[source]

Return a subset of this RDD sampled by key (via stratified sampling). Create a sample of this RDD using variable sampling rates for different keys as specified by fractions, a key to sampling rate map.

>>> fractions={"a":0.2,"b":0.1}>>> rdd=sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0,1000)))>>> sample=dict(rdd.sampleByKey(False,fractions,2).groupByKey().collect())>>> 100>> max(sample["a"])<=999andmin(sample["a"])>=0True>>> max(sample["b"])<=999andmin(sample["b"])>=0True

sampleStdev()[source]

Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).

>>> sc.parallelize([1,2,3]).sampleStdev()1.0

sampleVariance()[source]

Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).

>>> sc 。并行化([ 1 ,2 ,3 ]) 。sampleVariance ()1.0

saveAsHadoopDataset(confkeyConverter = NonevalueConverter = None [source]

使用旧的Hadoop OutputFormat API(mapred包),将键值对(形式为RDD [(K,V)])的Python RDD输出到任何Hadoop文件系统。键/值将使用用户指定的转换器或默认情况下转换为输出 org.apache.spark.api.python.JavaToWritableConverter。

参数:conf - Hadoop作业配置,作为字典传入

keyConverter - (默认无)

valueConverter - (默认无)

saveAsHadoopFile(pathoutputFormatClasskeyClass = NonevalueClass = NonekeyConverter = NonevalueConverter = Noneconf = NonecompressionCodecClass = None [source]

使用旧的Hadoop OutputFormat API(mapred包),将键值对(形式为RDD [(K,V)])的Python RDD输出到任何Hadoop文件系统。如果未指定,则会推断键和值类型。使用用户指定的转换器或键,将键和值转换为输出org.apache.spark.api.python.JavaToWritableConverter。将 conf其应用于与此RDD的SparkContext关联的基本Hadoop conf之上,以创建用于保存数据的合并Hadoop MapReduce作业配置。

参数:路径 - Hadoop文件的路径

outputFormatClass - Hadoop OutputFormat的完全限定类名(例如“org.apache.hadoop.mapred.SequenceFileOutputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.IntWritable”,默认为None)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.Text”,默认为None)

keyConverter - (默认无)

valueConverter - (默认无)

conf - (默认没有)

compressionCodecClass - (默认无)

saveAsNewAPIHadoopDataset(confkeyConverter = NonevalueConverter = None [source]

使用新的Hadoop OutputFormat API(mapreduce包),将键值对(形式为RDD [(K,V)]的Python RDD输出到任何Hadoop文件系统。键/值将使用用户指定的转换器或默认情况下转换为输出 org.apache.spark.api.python.JavaToWritableConverter。

参数:conf - Hadoop作业配置,作为字典传入

keyConverter - (默认无)

valueConverter - (默认无)

saveAsNewAPIHadoopFile(pathoutputFormatClasskeyClass = NonevalueClass = NonekeyConverter = NonevalueConverter = Noneconf = None [source]

使用新的Hadoop OutputFormat API(mapreduce包),将键值对(形式为RDD [(K,V)]的Python RDD输出到任何Hadoop文件系统。如果未指定,则会推断键和值类型。使用用户指定的转换器或键,将键和值转换为输出org.apache.spark.api.python.JavaToWritableConverter。将 conf其应用于与此RDD的SparkContext关联的基本Hadoop conf之上,以创建用于保存数据的合并Hadoop MapReduce作业配置。

参数:路径 - Hadoop文件的路径

outputFormatClass - Hadoop OutputFormat的完全限定类名(例如“org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat”)

keyClass - 关键Writable类的完全限定类名(例如“org.apache.hadoop.io.IntWritable”,默认为None)

valueClass - 值的完全限定类名可写类(例如“org.apache.hadoop.io.Text”,默认为None)

keyConverter - (默认无)

valueConverter - (默认无)

conf - Hadoop作业配置,以字典形式传入(默认为无)

saveAsPickleFile(pathbatchSize = 10 [source]

将此RDD保存为序列化对象的SequenceFile。使用的序列化程序是pyspark.serializers.PickleSerializer,默认批量大小为10。

>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize([1,2,'spark','rdd']).saveAsPickleFile(tmpFile.name,3)>>> sorted(sc.pickleFile(tmpFile.name,5).map(str).collect())['1', '2', 'rdd', 'spark']

saveAsSequenceFile(pathcompressionCodecClass=None)[source]

Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the org.apache.hadoop.io.Writable types that we convert from the RDD’s key and value types. The mechanism is as follows:

Pyrolite is used to convert pickled Python RDD into RDD of Java objects.

Keys and values of this Java RDD are converted to Writables and written out.

Parameters:path – path to sequence file

compressionCodecClass – (None by default)

saveAsTextFile(pathcompressionCodecClass=None)[source]

Save this RDD as a text file, using string representations of elements.

Parameters:path – path to text file

compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”

>>> tempFile=NamedTemporaryFile(delete=True)>>> tempFile.close()>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)>>> fromfileinputimportinput>>> fromglobimportglob>>> ''.join(sorted(input(glob(tempFile.name+"/part-0000*"))))'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'

Empty lines are tolerated when saving to text files.

>>> tempFile2=NamedTemporaryFile(delete=True)>>> tempFile2.close()>>> sc.parallelize(['','foo','','bar','']).saveAsTextFile(tempFile2.name)>>> ''.join(sorted(input(glob(tempFile2.name+"/part-0000*"))))'\n\n\nbar\nfoo\n'

Using compressionCodecClass

>>> tempFile3=NamedTemporaryFile(delete=True)>>> tempFile3.close()>>> codec="org.apache.hadoop.io.compress.GzipCodec">>> sc.parallelize(['foo','bar']).saveAsTextFile(tempFile3.name,codec)>>> fromfileinputimportinput,hook_compressed>>> result=sorted(input(glob(tempFile3.name+"/part*.gz"),openhook=hook_compressed))>>> b''.join(result).decode('utf-8')u'bar\nfoo\n'

setName(name)[source]

Assign a name to this RDD.

>>> rdd1=sc.parallelize([1,2])>>> rdd1.setName('RDD1').name()u'RDD1'

sortBy(keyfuncascending=TruenumPartitions=None)[source]

Sorts this RDD by the given keyfunc

>>> tmp=[('a',1),('b',2),('1',3),('d',4),('2',5)]>>> sc.parallelize(tmp).sortBy(lambdax:x[0]).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> sc.parallelize(tmp).sortBy(lambdax:x[1]).collect()[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

sortByKey(ascending=TruenumPartitions=Nonekeyfunc=>)[source]

Sorts this RDD, which is assumed to consist of (key, value) pairs.

>>> tmp=[('a',1),('b',2),('1',3),('d',4),('2',5)]>>> sc.parallelize(tmp).sortByKey().first()('1', 3)>>> sc.parallelize(tmp).sortByKey(True,1).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> sc.parallelize(tmp).sortByKey(True,2).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> tmp2=[('Mary',1),('had',2),('a',3),('little',4),('lamb',5)]>>> tmp2.extend([('whose',6),('fleece',7),('was',8),('white',9)])>>> sc.parallelize(tmp2).sortByKey(True,3,keyfunc=lambdak:k.lower()).collect()[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

stats()[source]

Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.

stdev()[source]

Compute the standard deviation of this RDD’s elements.

>>> sc.parallelize([1,2,3]).stdev()0.816...

subtract(othernumPartitions=None)[source]

Return each value in self that is not contained in other.

>>> x=sc.parallelize([("a",1),("b",4),("b",5),("a",3)])>>> y=sc.parallelize([("a",3),("c",None)])>>> sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]

subtractByKey(othernumPartitions=None)[source]

Return each (key, value) pair in self that has no pair with matching key in other.

>>> x=sc.parallelize([("a",1),("b",4),("b",5),("a",2)])>>> y=sc.parallelize([("a",3),("c",None)])>>> sorted(x.subtractByKey(y).collect())[('b', 4), ('b', 5)]

sum()[source]

Add up the elements in this RDD.

>>> sc.parallelize([1.0,2.0,3.0]).sum()6.0

sumApprox(timeoutconfidence=0.95)[source]

Note


Experimental

Approximate operation to return the sum within a timeout or meet the confidence.

>>> rdd=sc.parallelize(range(1000),10)>>> r=sum(range(1000))>>> abs(rdd.sumApprox(1000)-r)/r<0.05True

take(num)[source]

Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Note


this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

>>> sc.parallelize([2,3,4,5,6]).cache().take(2)[2, 3]>>> sc.parallelize([2,3,4,5,6]).take(10)[2, 3, 4, 5, 6]>>> sc.parallelize(range(100),100).filter(lambdax:x>90).take(3)[91, 92, 93]

takeOrdered(numkey=None)[source]

Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.

Note


this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

>>> sc.parallelize([10,1,2,9,3,4,5,6,7]).takeOrdered(6)[1, 2, 3, 4, 5, 6]>>> sc.parallelize([10,1,2,9,3,4,5,6,7],2).takeOrdered(6,key=lambdax:-x)[10, 9, 7, 6, 5, 4]

takeSample(withReplacementnumseed=None)[source]

Return a fixed-size sampled subset of this RDD.

Note


This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

>>> rdd=sc.parallelize(range(0,10))>>> len(rdd.takeSample(True,20,1))20>>> len(rdd.takeSample(False,5,2))5>>> len(rdd.takeSample(False,15,3))10

toDebugString()[source]

A description of this RDD and its recursive dependencies for debugging.

toLocalIterator()[source]

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

>>> rdd=sc.parallelize(range(10))>>> [xforxinrdd.toLocalIterator()][0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

top(numkey=None)[source]

Get the top N elements from an RDD.

Note


This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

Note


It returns the list sorted in descending order.

>>> sc.parallelize([10,4,2,12,3]).top(1)[12]>>> sc.parallelize([2,3,4,5,6],2).top(2)[6, 5]>>> sc.parallelize([10,4,2,12,3]).top(3,key=str)[4, 3, 2]

treeAggregate(zeroValueseqOpcombOpdepth=2)[source]

Aggregates the elements of this RDD in a multi-level tree pattern.

Parameters:depth – suggested depth of the tree (default: 2)

>>> add=lambdax,y:x+y>>> rdd=sc.parallelize([-5,-4,-3,-2,-1,1,2,3,4],10)>>> rdd.treeAggregate(0,add,add)-5>>> rdd.treeAggregate(0,add,add,1)-5>>> rdd.treeAggregate(0,add,add,2)-5>>> rdd.treeAggregate(0,add,add,5)-5>>> rdd.treeAggregate(0,add,add,10)-5

treeReduce(fdepth=2)[source]

Reduces the elements of this RDD in a multi-level tree pattern.

Parameters:depth – suggested depth of the tree (default: 2)

>>> add=lambdax,y:x+y>>> rdd=sc.parallelize([-5,-4,-3,-2,-1,1,2,3,4],10)>>> rdd.treeReduce(add)-5>>> rdd.treeReduce(add,1)-5>>> rdd.treeReduce(add,2)-5>>> rdd.treeReduce(add,5)-5>>> rdd.treeReduce(add,10)-5

union(other)[source]

Return the union of this RDD and another one.

>>> rdd=sc.parallelize([1,1,2,3])>>> rdd.union(rdd).collect()[1, 1, 2, 3, 1, 1, 2, 3]

unpersist()[source]

Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

values()[source]

Return an RDD with the values of each tuple.

>>> m=sc.parallelize([(1,2),(3,4)]).values()>>> m.collect()[2, 4]

variance()[source]

Compute the variance of this RDD’s elements.

>>> sc.parallelize([1,2,3]).variance()0.666...

zip(other)[source]

Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

>>> x = sc 。并行化(范围(0 ,5 ))>>> ý = SC 。并行化(范围(1000 ,1005 ))>>> X 。zip (y )。collect ()[(0,1000),(1,1001),(2,1002),(3,1003),(4,1004)]

zipWithIndex()[source]

将此RDD与其元素索引一起拉伸。

排序首先基于分区索引,然后是每个分区内项目的排序。因此,第一个分区中的第一个项目获取索引0,最后一个分区中的最后一个项目接收最大的索引。

当此RDD包含多个分区时,此方法需要触发Spark任务。

>>> sc 。并行化([ “a” ,“b” ,“c” ,“d” ],3 )。zipWithIndex ()。collect ()[('a',0),('b',1),('c',2),('d',3)]

zipWithUniqueId()[source]

使用生成的独特Long ID对此RDD进行压缩。

第k个分区中的项目将获得ids k,n + k,2 * n + k,...,其中n是分区数。所以可能存在差距,但这种方法不会引发火花的工作,这是不同的zipWithIndex

>>> sc 。并行化([ “a” ,“b” ,“c” ,“d” ,“e” ],3 )。zipWithUniqueId ()。collect ()[('a',0),('b',1),('c',4),('d',2),('e',5)]

pyspark.StorageLevel(useDiskuseMemoryuseOffHeap反序列化复制= 1 [source]

用于控制RDD存储的标志。每个StorageLevel记录是否使用内存,是否将RDD丢弃到内存不足,是否将数据保存在特定于JAVA的序列化格式的内存中,以及是否在多个节点上复制RDD分区。还包含一些常用存储级别MEMORY_ONLY的静态常量。由于数据总是在Python端序列化,所有常量都使用序列化格式。

DISK_ONLY= StorageLevel(True,False,False,False,1)

DISK_ONLY_2= StorageLevel(True,False,False,False,2)

MEMORY_AND_DISK= StorageLevel(True,True,False,False,1)

MEMORY_AND_DISK_2= StorageLevel(True,True,False,False,2)

MEMORY_AND_DISK_SER= StorageLevel(True,True,False,False,1)

MEMORY_AND_DISK_SER_2= StorageLevel(True,True,False,False,2)

MEMORY_ONLY= StorageLevel(False,True,False,False,1)

MEMORY_ONLY_2= StorageLevel(False,True,False,False,2)

MEMORY_ONLY_SER= StorageLevel(False,True,False,False,1)

MEMORY_ONLY_SER_2= StorageLevel(False,True,False,False,2)

OFF_HEAP= StorageLevel(True,True,True,False,1)

class pyspark.Broadcast(sc = Nonevalue = Nonepickle_registry = Nonepath = None [source]

使用创建的广播变量SparkContext.broadcast()。通过访问它的价值value。

例子:

>>> from pyspark.context import SparkContext >>> sc = SparkContext ('local' ,'test' )>>> b = sc 。广播([ 1 ,2 ,3 ,4 ,5 ])>>> b 。值[1,2,3,4,5] >>> sc 。并行化([ 0 ,0 ]) 。flatMap (lambda x :b。值)。collect ()[1,2,3,4,5,1,2,3,4,5] >>> b 。unpersist ()

>>> large_broadcast = sc 。广播(范围(10000 ))

destroy()[source]

销毁与此广播变量相关的所有数据和元数据。谨慎使用这个; 一旦广播变量被销毁,它不能再被使用。此方法阻止直到销毁完成。

dump(value[source]

load(path [source]

unpersist(blocking = False [源代码]

在执行者上删除此广播的缓存副本。如果广播在调用之后使用,则需要将其重新发送给每个执行者。

参数:阻止 - 是否阻止,直到未完成

value

返回广播的值

class pyspark.Accumulator(aidvalueaccum_param [source]

可以累积的共享变量,即具有可交换和关联的“添加”操作。Spark集群上的工作器任务可以使用+ = 运算符将值添加到累加器,但只有驱动程序可以使用访问其值value。工人的更新会自动传播到驱动程序。

虽然SparkContext载体对累加器基本数据类型,如int和 float,用户还可以通过提供一种定制定义的自定义类型累加器 AccumulatorParam对象。举例来说,参考这个模块的doctest。

add(term [source]

为此累加器的值添加一个术语

value

获取累加器的值; 只能在驱动程序中使用

class pyspark.AccumulatorParam[source]

Helper对象,用于定义如何累积给定类型的值。

addInPlace(value1value2 [source]

添加累加器数据类型的两个值,返回一个新值; 为了效率,也可以更新value1并返回它。

zero(价值[来源]

为该类型提供“零值”,与所提供的尺寸兼容value(例如,零向量)

class pyspark.MarshalSerializer[source]

使用Python的Marshal序列化器序列化对象:

http://docs.python.org/2/library/marshal.html

该序列化程序比PickleSerializer更快,但支持更少的数据类型。

dumps(obj [source]

loads(obj [source]

class pyspark.PickleSerializer[source]

使用Python的pickle序列化器序列化对象:

http://docs.python.org/2/library/pickle.html

这个序列化程序几乎支持任何Python对象,但可能不如更专用的序列化程序那么快。

dumps(obj [source]

loads(objencoding = None [source]

class pyspark.StatusTracker(jtracker [source]

Low-level status reporting APIs for monitoring job and stage progress.

These APIs intentionally provide very weak consistency semantics; consumers of these APIs should be prepared to handle empty / missing information. For example, a job’s stage ids may be known but the status API may not have any information about the details of those stages, so getStageInfo could potentially return None for a valid stage id.

To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs will provide information for the lastspark.ui.retainedStages stages and spark.ui.retainedJobs jobs.

getActiveJobsIds()[source]

Returns an array containing the ids of all active jobs.

getActiveStageIds()[source]

Returns an array containing the ids of all active stages.

getJobIdsForGroup(jobGroup=None)[source]

Return a list of all known jobs in a particular job group. If jobGroup is None, then returns all known jobs that are not associated with a job group.

The returned list may contain running, failed, and completed jobs, and may vary across invocations of this method. This method does not guarantee the order of the elements in its result.

getJobInfo(jobId)[source]

Returns a SparkJobInfo object, or None if the job info could not be found or was garbage collected.

getStageInfo(stageId)[source]

Returns a SparkStageInfo object, or None if the stage info could not be found or was garbage collected.

class pyspark.SparkJobInfo[source]

Exposes information about Spark Jobs.

class pyspark.SparkStageInfo[source]

Exposes information about Spark Stages.

class pyspark.Profiler(ctx)[source]

Note


DeveloperApi

PySpark supports custom profilers, this is to allow for different profilers to be used as well as outputting to different formats than what is provided in the BasicProfiler.

A custom profiler has to define or inherit the following methods:

profile - will produce a system profile of some sort. stats - return the collected stats. dump - dumps the profiles to a path add - adds a profile to the existing accumulated profile

The profiler class is chosen when creating a SparkContext

>>> frompysparkimportSparkConf,SparkContext>>> frompysparkimportBasicProfiler>>> classMyCustomProfiler(BasicProfiler):... defshow(self,id):... print("My custom profiles for RDD:%s"%id)...>>> conf=SparkConf().set("spark.python.profile","true")>>> sc=SparkContext('local','test',conf=conf,profiler_cls=MyCustomProfiler)>>> sc.parallelize(range(1000)).map(lambdax:2*x).take(10)[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]>>> sc.parallelize(range(1000))。count ()1000 >>> sc 。show_profiles ()我的RDD自定义配置文件:1 我的RDD自定义配置文件:3 >>> sc 。停止()

dump(idpath [source]

将配置文件转储到路径中,id是RDD标识

profile(func [source]

分析功能func

show(id [source]

将配置文件统计信息打印到标准输出,id是RDD标识

stats()[source]

返回收集的性能分析统计信息(pstats.Stats)

class pyspark.BasicProfiler(ctx [source]

BasicProfiler是默认的分析器,它是基于cProfile和Accumulator实现的

profile(func [source]

运行并分析传入的方法to_profile。返回配置文件对象。

stats()[source]

class pyspark.TaskContext[source]

注意


试验

关于可在执行期间读取或变更的任务的上下文信息。要访问正在运行的任务的TaskContext,请使用: TaskContext.get()

attemptNumber()[source]

“这个任务尝试了多少次。第一个任务尝试将被分配attemptNumber = 0,并且随后的尝试将具有增加的尝试次数。

classmethod get()[source]

返回当前活动的TaskContext。这可以在用户函数内部调用,以访问关于正在运行的任务的上下文信息。

注意


必须向工人而不是司机打电话。如果未初始化,则返回None。

partitionId()[source]

此任务计算的RDD分区的ID。

stageId()[source]

此任务所属阶段的ID。

taskAttemptId()[source]

对于此任务尝试唯一的标识(在同一个SparkContext中,没有两个任务尝试将共享相同的尝试标识)。这大致相当于Hadoop的TaskAttemptID。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容