6. Spark数据分区示例:PageRank

1. 算法简介

PageRank 是执行多次连接的一个迭代算法,因此它是RDD 分区操作的一个很好的用例。算法会维护两个数据集:

  • 一个由(pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;
  • 另一个由(pageID, rank) 元素组成,包含每个页面的当前排序值。

它按如下步骤进行计算:

  1. 将每个页面的排序值初始化为1.0。
  2. 在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p) 的贡献值。
  3. 将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。

最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank 值。在实际操作中,收敛通常需要大约10 轮迭代。

2. 数据模拟

假设一个由4个页面组成的小团体:A,B,C,D。相邻页面如下所示:
A:B C
B:A C
C:A B D
D:C

4. 测试代码

// Scala版PageRank
import org.apache.spark.HashPartitioner  
// 假设相邻页面列表以Spark objectFile的形式存储
val links = sc.parallelize(List(
  ("A",List("B","C")),
  ("B",List("A","C")),
  ("C",List("A","B","D")),
  ("D",List("C"))
)).partitionBy(new HashPartitioner(100))
  .persist()  
// 将每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD
// 的分区方式会和"links"的一样
var ranks = links.mapValues(v => 1.0)
// 运行10轮PageRank迭代
for(i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// 写出最终排名
ranks.sortByKey().collect().foreach(println)
运行结果

5. 运行过程分析

初始的linksRDD和ranksRDD如下所示:

//linksRDD:
Array[(String, List[String])] = Array(
  (A,List(B, C)), 
  (B,List(A, C)), 
  (C,List(A, B, D)), 
  (D,List(C)) )
//ranksRDD:
Array[(String, Double)] = Array((A,1.0), (B,1.0), (C,1.0), (D,1.0))

首次迭代后的contributionsRDD和ranksRDD如下所示:

//contributionsRDD:
Array[(String, Double)] = Array(
 (A,0.5), (A,0.3333333333333333), 
 (B,0.5), (B,0.3333333333333333), 
 (C,0.5), (C,0.5), (C,1.0), (D,0.3333333333333333))
//ranksRDD:
Array[(String, Double)] = Array(
 (A,0.8583333333333333),
 (B,0.8583333333333333), 
 (C,1.8499999999999999), 
 (D,0.43333333333333335) )

验证数据:
第1次迭代:

PR(A)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
PR(B)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
PR(C)=0.15 + 0.85 * (1/2 + 1/2 + 1/1) = 1.85
PR(D)=0.15 + 0.85 * (1/3) = 0.433333

第2次迭代:

PR(A)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
PR(B)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
PR(C)=0.15 + 0.85 * (0.858333/2 + 0.858333/2 + 0.433333/1) = 1.247916100000
PR(D)=0.15 + 0.85 * (1.85/3) = 0.67416667

第3次迭代:

PR(A)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
PR(B)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
PR(C)=0.15 + 0.85 * (1.038958191100/2 + 1.038958191100/2 + 0.67416667/1) = 1.606156131935
PR(D)=0.15 + 0.85 * (1.247916100000/3) = 0.503576228333333333

首先对当前的ranksRDD和静态的linksRDD 进行一次join() 操作,来获取每个页面ID对应的相邻页面列表和当前的排序值,然后使用flatMap创建出“contributions”来记录每个页面对各相邻页面的贡献。然后再把这些贡献值按照页面ID(根据获得共享的页面)分别累加起来,把该页面的排序值设为0.15 + 0.85 * contributionsReceived。

虽然代码本身很简单,这个示例程序还是做了不少事情来确保RDD 以比较高效的方式进行分区,以最小化通信开销:

  1. 请注意,linksRDD 在每次迭代中都会和ranks 发生连接操作。由于links 是一个静态数据集,所以我们在程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。实际上,linksRDD 的字节数一般来说也会比ranks 大很多,毕竟它包含每个页面的相邻页面列表(由页面ID 组成),而不仅仅是一个Double 值,因此这一优化相比PageRank 的原始实现(例如普通的MapReduce)节约了相当可观的网络通信开销。
  2. 出于同样的原因,我们调用links 的persist() 方法,将它保留在内存中以供每次迭代使用。
  3. 当我们第一次创建ranks 时,我们使用mapValues() 而不是map() 来保留父RDD(links)的分区方式,这样对它进行的第一次连接操作就会开销很小。
  4. 在循环体中,我们在reduceByKey() 后使用mapValues();因为reduceByKey() 的结果已经是哈希分区的了,这样一来,下一次循环中将映射操作的结果再次与links 进行连接操作时就会更加高效。

注意:为了最大化分区相关优化的潜在作用,你应该在无需改变元素的键时尽量使用mapValues() 或flatMapValues()。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容