5. 第四章 键值对操作

1. 创建Pair RDD

//Scala中使用第一个单词作为键创建出一个pair RDD
val lines = sc.textFile("/path/README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
//Java中使用第一个单词作为键创建出一个pair RDD
JavaRDD<String> lines = sc.textFile("/path/README.md")
JavaPairRDD<String, String> pairs = lines.mapToPair(x -> new Tuple2(x.split(" ")[0], x))

当用Scala 和Python 从一个内存中的数据集创建pair RDD 时,只需要对这个由二元组组成的集合调用SparkContext.parallelize() 方法。而要使用Java 从内存数据集创建pair RDD的话,则需要使用SparkContext.parallelizePairs()。

//Java中使用parallelizePairs创建pair RDD
List<Tuple2<String,Integer>> lt = new ArrayList<>();
Tuple2<String,Integer> tp1 = new Tuple2<>("pinda", 2);
Tuple2<String,Integer> tp2 = new Tuple2<>("qank", 6);
Tuple2<String,Integer> tp3 = new Tuple2<>("panda", 5);
lt.add(tp1);
lt.add(tp2);
lt.add(tp3);
JavaPairRDD<String,Integer> data = js.parallelizePairs(lt);

2. Pair RDD的转化操作

2.1 基本转化操作

Pair RDD可以使用所有标准RDD上的可用的转化操作。

//Scala筛选掉长度超过20个字符的行
pairs.filter{case (key, value) => value.length < 20}
//Java筛选掉长度超过20个字符的行
Function<Tuple2<String, String>, Boolean> longWordFilter =
  new Function<Tuple2<String, String>, Boolean>() {
    public Boolean call(Tuple2<String, String> keyValue) {
      return (keyValue._2().length() < 20);
  }
};
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);
Pair RDD的转化操作
针对两个Pair RDD的转化操作
2.2 聚合操作
//在Scala 中使用reduceByKey() 和mapValues() 计算每个键对应的平均值
val rdd = sc.parallelize(List(("panda", 0), ("pink", 3), ("priate", 3), ("panda", 1), ("pink", 4)))
val keyMean = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
  • 使用reduceByKey() 和mapValues() 计算每个键对应的平均值的数据流
key value =>mapValues=> key value =>reduceByKey=> key value
panda 0 panda (0, 1) panda (1, 2)
pink 3 pink (3, 1) pink (7, 2)
pirate 3 pirate (3, 1) pirate (3, 1)
panda 1 panda (1, 1)
pink 4 pink (4, 1)
  • 使用flatMap() 和 map()来生成以单词为键、以数字1 为值的pair RDD,然后使用reduceByKey() 对所有的单词进行计数。
//用Scala 实现单词计数
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
//对RDD input使用countByValue() 函数,以更快地实现单词计数
val result = input.flatMap(x => x.split(" ")).countByValue()
//用Java 实现单词计数
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = input.flatMap(line -> line.split(" "));
JavaPairRDD<String, Integer> result = words.mapToPair(x -> new Tuple2(x, 1)).
    reduceByKey((x, y) -> x + y);
  • combineByKey() 是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate() 一样,combineByKey() 可以让用户返回与输入数据的类型不同的返回值。
    combineByKey() 有多个参数分别对应聚合操作的各个阶段,因而非常适合用来解释聚合操作各个阶段的功能划分。

    1. combineByKey的定义
    def combineByKey[C](  
        createCombiner: V => C,  
        mergeValue: (C, V) => C,  
        mergeCombiners: (C, C) => C,  
        partitioner: Partitioner,  
        mapSideCombine: Boolean = true,  
        serializer: Serializer = null )  
    
    1. 解释下3个重要的函数参数:
    • createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
    • mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
    • mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
    1. 使用combineByKey来求解平均数的例子
    //在Scala 中使用combineByKey() 求每个键对应的平均值
    val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), 
       ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))  
    val rdd = sc.parallelize(initialScores)  
    type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)  
    rdd.combineByKey(  
      score => (1, score),  
      (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),  
      (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)  
    ).map { case (name, (num, socre)) => (name, socre / num) }.collect  
    
    1. 参数含义的解释
    • score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0) 1表示当前科目的计数器,此时只有一个科目
    • (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScorez,然后把科目计算器加1即c1._1 + 1
    • (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数
    1. 执行结果
    res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))  
    
combineByKey()数据流示意图
//Java 中使用combineByKey() 求每个键对应的平均值
public static class AvgCount implements Serializable {
  public AvgCount(int total, int num) { total_ = total; num_ = num; }
  public int total_;
  public int num_;
  public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
  public AvgCount call(Integer x) {
    return new AvgCount(x, 1);
  }
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
  new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
      a.total_ += x;
      a.num_ += 1;
      return a;
   }
};
Function2<AvgCount, AvgCount, AvgCount> combine =
  new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
    a.total_ += b.total_;
    a.num_ += b.num_;
    return a;
  }
};
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
    nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
  System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
  • 数据分组 groupByKey() cogroup()
  • 连接 join() rightOuterJoin() leftOuterJoin()
  • 数据排序 sortByKey()

3. Pair RDD的行动操作

和转化操作一样,所有基础RDD支持的行动操作也都在pair RDD上可用。

一些额外的行动操作

4. 数据分区

4.1 获取RDD的数据分区

你可以使用RDD 的partitioner 属性(Java 中使用partitioner() 方法)来获取RDD 的分区方式。它会返回一个scala.Option 对象,这是Scala 中用来存放可能存在的对象的容器类。你可以对这个Option 对象调用isDefined() 来检查其中是否有值,调用get() 来获取其中的值

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
//创建一个由(Int, Int) 对组成的RDD
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
//初始时没有分区方式信息(一个值为None 的Option 对象)。
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
//对第一个RDD 进行哈希分区,创建出了第二个RDD
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2)).persist()
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
//对RDD 完成哈希分区操作
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
4.2 从分区中获益的操作

Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。能够从数据分区中获益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey() 以及lookup()。

4.3 影响分区方式的操作

会为生成的结果RDD设好分区方式的操作:cogroup()、groupWith()、join()、lef tOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()(如果父RDD 有分区方式的话)、flatMapValues()(如果父RDD 有分区方式的话),以及filter()(如果父RDD 有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。
最后,对于二元操作,输出数据的分区方式取决于父RDD 的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中的一个父RDD 已经设置过分区方式,那么结果就会采用那种分区方式;如果两个父RDD 都设置过分区方式,结果RDD 会采用第一个父RDD 的分区方式。

4.4 自定义分区

要实现自定义的分区器,你需要继承org.apache.spark.Partitioner 类并实现下面三个方法。

  • numPartitions: Int:返回创建出来的分区数。
  • getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
  • equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark 才可以判断两个RDD 的分区方式是否相同。
4.4.1 数据分区示例

举个例子,假设我们要在一个网页的集合上运行前一节中的PageRank 算法。在这里,每个页面的ID(RDD 中的键)是页面的URL。当我们使用简单的哈希函数进行分区时,拥有相似的URL 的页面(比如http://www.cnn.com/WORLDhttp://www.cnn.com/US)可能会被分到完全不同的节点上。然而,我们知道在同一个域名下的网页更有可能相互链接。由于PageRank 需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因此把这些页面分组到同一个分区中会更好。可以使用自定义的分区器来实现仅根据域名而不是整个URL 来分区。

//使用Scala自定义分区
class DomainNamePartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new Java.net.URL(key.toString).getHost()
    val code = (domain.hashCode % numPartitions)
    if(code < 0) {
      code + numPartitions // 使其非负
    }else{
      code
    }
  }
  // 用来让Spark区分分区函数对象的Java equals方法
  override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
      dnp.numPartitions == numPartitions
    case _ =>
      false
  }
}

注意,在equals() 方法中,使用Scala 的模式匹配操作符(match)来检查other 是否是DomainNamePartitioner,并在成立时自动进行类型转换;这和Java 中的instanceof() 是一样的。

使用自定义的Partitioner 是很容易的:只要把它传给partitionBy() 方法即可。Spark 中有许多依赖于数据混洗的方法,比如join() 和groupByKey(),它们也可以接收一个可选的Partitioner 对象来控制输出数据的分区方式。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容