【2019-06-23】键值对操作

键值对RDD是spark中许多操作所需要的常见数据类型。键值对RDD通常用来进行聚合计算,一般要先通过一些初始的ETL(抽取、转化、装载)操作来将数据转化为键值对形式。
spark为包含键值对类型的RDD提供了一些专有的操作,这些RDD被称为pairRDD。
创建pairRDD

#在python中使用第一个单词作为键创建出一个pairRDD
lines = sc.parallelize(["hello world","hi"])
pairs=lines.map(lambda x:(x.split(" ")[0],x))

#在Scala中使用第一个单词作为键创建出一个pairRDD
val lines=sc.parallelize(List("pandas","i like pands"))
val pair=lines.map(x=>(x.split(" ")(0),x))

一个pairRDD的转化操作
reduceByKey(func)合并具有相同键的值。rdd.reduceByKey(x,y)=>x+y
groupByKey()对具有相同键的值进行分组。rdd.groupByKey()
combineByKey(createCombiner,meregeValue,mergeCombineers,partitioner)使用不同的返回类型合并具有相同相同键的值。
mapValue(func)对pairRDD中的每个值应用一个函数而不改变键。rdd.mapValues(x=x+1)
flatMapvalues(func) 对pairRdd中的每个值应用一个函数而不改变键。 rdd.mapValues(x=>x+1)
keys. 返回一个仅包含键的RDD。rdd.keys
values. 返回一个仅包含值的RDD。rdd.values
sortByKey() 返回一个根据键排序的RDD。rdd.sortBykey()

针对两个pairRDD的转化操作
subtractByKey 删掉RDD中键与otherRDD中的键相同的元素 rdd.subtractByKey(other)
join 对两个RDD内连接。 rdd .join(other)
rightOuterJoin. 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)。 rdd.rightOuterJoin(other)
leftOuterJoin. 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)。 rdd.leftOuterJoin(other)
cogroup将两个RDD中拥有相同键的数据分组到一起。 rdd.cogroup()

#使用python对第二个元素进行筛选
rdd_1=sc.parallelize([1,2,3])
rdd1_pairs=rdd_1.map(lambda x:(x*x,x))
rdd1_pairs.take(3)
//[(1, 1), (4, 2), (9, 3)]
rdd_filter=rdd1_pairs.filter(lambda keyValue: keyValue[1]<3)
rdd_filter.take(2)
//[(1, 1), (4, 2)]

//使用Scala对第二个元素进行筛选
val rdd_1=sc.parallelize(List(1,2,3))
val rdd1_pairs=rdd_1.map(x=>(x,x*x))
rdd1_pairs.take(3)
//Array[(Int, Int)] = Array((1,1), (2,4), (3,9))
val rdd1_filter=rdd1_pairs.filter{case (key,value)=> value<3}
rdd1_filter.take(3)
//Array[(Int, Int)] = Array((1,1))

聚合操作
当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作。

# python 用reduceByKey()和mapValues()计算每个键对应的平均值
#.rdd1_pairs.    [(1, 1), (4, 2), (9, 3)]

rdd_reduce=rdd1_pairs.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd_reduce.take(3)
//[(4, (2, 1)), (9, (3, 1)), (1, (1, 1))]


//scala用reduceByKey()和mapValues()计算每个键对应的平均值
//rdd1_pairs   Array((1,1), (2,4), (3,9))
val rdd1_reduce=rdd1_pairs.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
rdd1_reduce.take(3)
// Array[(Int, (Int, Int))] = Array((2,(4,1)), (1,(1,1)), (3,(9,1)))  


#用python实现单词计数
rdd=sc.textFile("/user/test/read.txt")
words=rdd.flatMap(lambda x:x.split(" "))
result=words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
result.take(10)
#[(u'', 2), (u'feed', 1), (u'all', 1), (u'love', 1), (u'sky', 1), (u'comfortable,', 1), (u'liked', 1), (u'down', 1), (u'view,', 1)]


//scala 实现单词计数
val input=sc.textFile("/user/test/read.txt")
val words=input.flatMap(x => x.split(" "))
val result=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
result.take(2)
// Array[(String, Int)] = Array((She,1), (comfortable,,1))     

数据分区
spark程序可以通过控制RDD分区方式来减少通信开销,当数据集多次在类似于连接操作这种基于键的操作中使用时,分区才有帮助。
获取RDD的分区方式

//获取RDD分区方式
//创建一个(Int,Int)对组成的RDD
import org.apache.spark
val pairs=sc.parallelize(List((1,1),(2,2),(3,3)))
pairs.partitioner
val partitioned=pairs.partitionBy(new spark.HashPartitioner(2))
partitioned.partitioner

PageRank

//scala PangRank
//假设相邻页面列表以Spark ObjectFile的形式存储
val links=sc.objectFiles[(String,Seq[String])]("links")
                                    .partitionBy(new HashPartitioner(100))
                                     .persist()
//每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD的分区方式会和"links"一样
var ranks=links.mapValues(v=>1.0)

//运行10轮的PanhRank迭代
for(i<-0 until 10){
val contributions=links.join(ranks).faltMap{
case (pageId,(links,rank))=>
links.map(dest=>(dest,rank/link.size))
}
ranks=contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)
}
//最终排名
ranks.saveAsTextFile("ranks")

自定义分区方式
实现自定义的分区,需要继承org.apache.spark.Partitioner类,并实现下面三个方法:
(1)numPartitions:Int:返回创建出来的分区数
(2)getPartition(key,Any):Int返回给定键的分区编号
(3)equals():Java判断相等性的标准方法
基于域名的分区器,这个分区器只对URL中的域名部分求哈希

//scala 自定义分区方式
class DomainNamePartitioner(numParts:Int) extends Partitioner{
override def numPartitions:Int=numParts
override def getPartition(key,Any):Int={
val domain=new Java.net.URL(key,toString).getHost()
val code=(domain.hashCode%numPartitions)
if(code<0):
code+numPartitions
}else{code}
}
override def equals(other:Any):Boolean=other match{
case dnp:DomainNamePartitioner=>dnp.numPartitions==numPartitions
case_=> false
}
}
# python自定义分区方式
#pathon 不需要扩展Partitioner 类,而是把一个特定的哈希函数作为额外的参数传给RDD.partitionBy()函数
import urlparse
def hash_domain(url):
  return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20,hash_domain) #创建20个分区
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容