spark RDD 编程

RDD可以通过两种方式创建:

1:读取一个外部数据接,比如从本地文件加载数据集,或者从HDFS文件系统,HBase等外部数据系统加载数据。Spark也可以支持文本文件,SequenceFile文件和其他符合Hadoop InputFormat 格式的文件

2:调用SparkContext 的parallelize方法,在Driver中一个已经存在的集合(数组)上创建

从文件系统中加载数据创建RDD

Spark 采用textFile() 方法来从文件系统中加载数据创建RDD,该方法把文件的URL作为参数,这个URL可以本地文件系统的地址,或者是分布式文件系统HDFS的地址

     val =sc.textFile("file:///home/data/word.txt")

在使用Spark读取文件时,需要说明以下几点:

(1)如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。

(2)textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

(3)textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个block创建一个分区(HDFS中每个block默认是128MB)。你也可以提供一个比block数量更大的值作为分区数目,但是,你不能提供一个小于block数量的值作为分区数目。

通过并行集合(数组)创建RDD

调用 SparkContext的parallelize方法,在Driver中一个已经存在的集合【数组】上创建

代码:val array=Array(1,2,3,4,5)

          val rdd =sc.parallenlize(array)


scala

RDD操作

RDD 被创建好之后,在后续使用过程中一般会发生两种操作:

           1....... 转换【Transformation】:基于现在的数据集创建一个新的数据集

           2 .......行动【Action】: 在数据集上运行计算,返回计算值

转换操作

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

下面列出一些常见的转换操作(Transformation API):

* filter(func):筛选出满足函数func的元素,并返回一个新的数据集

* map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集

* flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果

* groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

* reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

行动操作

行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

下面列出一些常见的行动操作(Action API):

* count()  返回数据集中的元素个数

* collect() 以数组的形式返回数据集中的所有元素

* first()  返回数据集中的第一个元素

* take(n)  以数组的形式返回数据集中的前n个元素

* reduce(func)  通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

* foreach(func) 将数据集中的每个元素传递到函数func中运行*

持久化

前面我们已经说过,在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

下面就是多次计算同一个DD的例子


rutusuosi

实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

persist()的圆括号中包含的是持久化级别参数,比如,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。

分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;

*Apache Mesos:默认的分区数为8;

*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:

scala>val array=Array(1,2,3,4,5)

array:Array[Int]=Array(1,2,3,4,5)

scala>val rdd=sc.parallelize(array,2)#设置两个分区

rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]at parallelize at:29

scala

对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。

如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。

打印元素

在实际编程中,我们经常需要把RDD中的元素打印输出到屏幕上(标准输出stdout),一般会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver

Program中,因此,任务控制节点Driver

Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver

Program中,可以使用collect()方法,比如,rdd.collect().foreach(println),但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到Driver

Program中,因此,这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可以采用语句rdd.take(100).foreach(println)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容