键值对RDD是spark中许多操作所需要的常见数据类型。键值对RDD通常用来进行聚合计算,一般要先通过一些初始的ETL(抽取、转化、装载)操作来将数据转化为键值对形式。
spark为包含键值对类型的RDD提供了一些专有的操作,这些RDD被称为pairRDD。
创建pairRDD
#在python中使用第一个单词作为键创建出一个pairRDD
lines = sc.parallelize(["hello world","hi"])
pairs=lines.map(lambda x:(x.split(" ")[0],x))
#在Scala中使用第一个单词作为键创建出一个pairRDD
val lines=sc.parallelize(List("pandas","i like pands"))
val pair=lines.map(x=>(x.split(" ")(0),x))
一个pairRDD的转化操作
reduceByKey(func)合并具有相同键的值。rdd.reduceByKey(x,y)=>x+y
groupByKey()对具有相同键的值进行分组。rdd.groupByKey()
combineByKey(createCombiner,meregeValue,mergeCombineers,partitioner)使用不同的返回类型合并具有相同相同键的值。
mapValue(func)对pairRDD中的每个值应用一个函数而不改变键。rdd.mapValues(x=x+1)
flatMapvalues(func) 对pairRdd中的每个值应用一个函数而不改变键。 rdd.mapValues(x=>x+1)
keys. 返回一个仅包含键的RDD。rdd.keys
values. 返回一个仅包含值的RDD。rdd.values
sortByKey() 返回一个根据键排序的RDD。rdd.sortBykey()
针对两个pairRDD的转化操作
subtractByKey 删掉RDD中键与otherRDD中的键相同的元素 rdd.subtractByKey(other)
join 对两个RDD内连接。 rdd .join(other)
rightOuterJoin. 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)。 rdd.rightOuterJoin(other)
leftOuterJoin. 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)。 rdd.leftOuterJoin(other)
cogroup将两个RDD中拥有相同键的数据分组到一起。 rdd.cogroup()
#使用python对第二个元素进行筛选
rdd_1=sc.parallelize([1,2,3])
rdd1_pairs=rdd_1.map(lambda x:(x*x,x))
rdd1_pairs.take(3)
//[(1, 1), (4, 2), (9, 3)]
rdd_filter=rdd1_pairs.filter(lambda keyValue: keyValue[1]<3)
rdd_filter.take(2)
//[(1, 1), (4, 2)]
//使用Scala对第二个元素进行筛选
val rdd_1=sc.parallelize(List(1,2,3))
val rdd1_pairs=rdd_1.map(x=>(x,x*x))
rdd1_pairs.take(3)
//Array[(Int, Int)] = Array((1,1), (2,4), (3,9))
val rdd1_filter=rdd1_pairs.filter{case (key,value)=> value<3}
rdd1_filter.take(3)
//Array[(Int, Int)] = Array((1,1))
聚合操作
当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作。
# python 用reduceByKey()和mapValues()计算每个键对应的平均值
#.rdd1_pairs. [(1, 1), (4, 2), (9, 3)]
rdd_reduce=rdd1_pairs.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd_reduce.take(3)
//[(4, (2, 1)), (9, (3, 1)), (1, (1, 1))]
//scala用reduceByKey()和mapValues()计算每个键对应的平均值
//rdd1_pairs Array((1,1), (2,4), (3,9))
val rdd1_reduce=rdd1_pairs.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
rdd1_reduce.take(3)
// Array[(Int, (Int, Int))] = Array((2,(4,1)), (1,(1,1)), (3,(9,1)))
#用python实现单词计数
rdd=sc.textFile("/user/test/read.txt")
words=rdd.flatMap(lambda x:x.split(" "))
result=words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
result.take(10)
#[(u'', 2), (u'feed', 1), (u'all', 1), (u'love', 1), (u'sky', 1), (u'comfortable,', 1), (u'liked', 1), (u'down', 1), (u'view,', 1)]
//scala 实现单词计数
val input=sc.textFile("/user/test/read.txt")
val words=input.flatMap(x => x.split(" "))
val result=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
result.take(2)
// Array[(String, Int)] = Array((She,1), (comfortable,,1))
数据分区
spark程序可以通过控制RDD分区方式来减少通信开销,当数据集多次在类似于连接操作这种基于键的操作中使用时,分区才有帮助。
获取RDD的分区方式
//获取RDD分区方式
//创建一个(Int,Int)对组成的RDD
import org.apache.spark
val pairs=sc.parallelize(List((1,1),(2,2),(3,3)))
pairs.partitioner
val partitioned=pairs.partitionBy(new spark.HashPartitioner(2))
partitioned.partitioner
PageRank
//scala PangRank
//假设相邻页面列表以Spark ObjectFile的形式存储
val links=sc.objectFiles[(String,Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
//每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD的分区方式会和"links"一样
var ranks=links.mapValues(v=>1.0)
//运行10轮的PanhRank迭代
for(i<-0 until 10){
val contributions=links.join(ranks).faltMap{
case (pageId,(links,rank))=>
links.map(dest=>(dest,rank/link.size))
}
ranks=contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)
}
//最终排名
ranks.saveAsTextFile("ranks")
自定义分区方式
实现自定义的分区,需要继承org.apache.spark.Partitioner类,并实现下面三个方法:
(1)numPartitions:Int:返回创建出来的分区数
(2)getPartition(key,Any):Int返回给定键的分区编号
(3)equals():Java判断相等性的标准方法
基于域名的分区器,这个分区器只对URL中的域名部分求哈希
//scala 自定义分区方式
class DomainNamePartitioner(numParts:Int) extends Partitioner{
override def numPartitions:Int=numParts
override def getPartition(key,Any):Int={
val domain=new Java.net.URL(key,toString).getHost()
val code=(domain.hashCode%numPartitions)
if(code<0):
code+numPartitions
}else{code}
}
override def equals(other:Any):Boolean=other match{
case dnp:DomainNamePartitioner=>dnp.numPartitions==numPartitions
case_=> false
}
}
# python自定义分区方式
#pathon 不需要扩展Partitioner 类,而是把一个特定的哈希函数作为额外的参数传给RDD.partitionBy()函数
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20,hash_domain) #创建20个分区