Spark的共享变量--累加变量和广播变量及其应用示例

一、为什么使用共享变量?

当Spark在使用某个算子对RDD进行运算时,如果需要用到外部变量,比如对RDD[Int]中的每个element乘以一个系数factor,得到一个新的RDD(定义在main方法中,driver进程):

val num = 3      # 外部变量
val newRdd = rdd.map(num => num * factor)    # transformation操作,lazy方式执行
newRdd.count      # action操作,立即执行

那么需要将factor从driver端通过网络传输到所有task中task数=RDD的partition数,每个task只能处理自己的那个factor副本),如果说1个executor负责处理多个task,那么该executor将会得到多份相同的factor,这种方式无疑增加了网络传输的开销和内存开销
很显然,如果同一个executor节点上的所有task能够共享同一份factor副本,那么将有效减小网络传输的开销和内存开销,这时候“共享变量”就闪亮登场了,这里的“共享“是指被多个task所共享。

Spark的共享变量包括两类:广播变量和累加变量

Spark共享变量的优点

上图红色箭头表示使用共享变量,蓝色箭头表示不使用共享变量。


二、Broadcast Variable 广播变量

2.1 广播变量的特点

  • 广播变量即将一个外部变量从driver端高效地通过网络发送到各executor节点,采取的是高效地“广播”方式,即先发送给离driver端最近(Spark内部定义了这种近的衡量方法)的一个executor(比如executor A),然后executor A会将该变量发送到它附近的executor,以此类推这种指数型增长的发送方式,可以有效地节省网络传输开销;
  • 一个application可能涉及到多个stage,在每个stage(driver中的DAG Scheduler将main方法代码解析得到DAG,分析每个stage得到该stage的taskset,taskset发送给task scheduler)中Spark会自动发送task所需的common data,只有当广播变量跨stage被task使用,才认为是有效的;
  • 由于广播变量会涉及到网络传输,因此必然涉及到对象的序列化和反序列化,driver端的外部变量被序列化之后得到byte数组,通过网络发送到集群其他机器的executor节点并以序列化的格式缓存在内存中,再反序列化为对象(在task执行之前完成);
  • executor只能读取(read-only)广播变量,而不能修改它;
  • 广播变量是从driver(即main方法)中定义并发出(sc.broadcast(factor)),在executor端接收并使用(broadcast.value),比如:
# 被广播的外部变量list
val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
val sc = sparkSession.sparkContext
// driver端定义广播变量
val listB = sc.broadcast(list)
// 初始化操作,用mappartition比map更加高效
val rddMap: RDD[(Int, Row)] = oldRdd.mapPartitions {
     partition => {
       // initialization
       // executor端接收广播变量
       InitUtil.init(listB.value)
       partition.map(row => (getPartitionOrder(row), getNewRow(row)))
     }
}

2.2 典型case:小表和大表做join操作

大表和小表做join操作时,可以把小表broadcast到各个节点,从而就可以把join操作转变成普通的操作(hashmap.get),以避免耗时的shuffle操作。
代码可以参考博客:https://www.jianshu.com/p/0c77036ad01b


三、Accumulator 累加变量

3.1 累加变量特点

  • 累加变量主要用于多个task对同一个变量进行共享性(并行)的操作
  • 累加器主要分为三种,即:LongAccumulator(长整数)、DoubleAccumulator(浮点数)、CollectionAccumulator(集合)
  • 可以实现自定义的累加器(比如BigDecimalAccumulator),比如:
import java.math.BigInteger

import org.apache.spark.util.AccumulatorV2

class BigIntegerAccumulator extends AccumulatorV2 {
  var num = BigInteger.ZERO

  def BigIntegerAccumulator(num: BigInteger)  {
    this.num = num
  }

  override def isZero: Boolean = {
    num.compareTo(BigInteger.ZERO)
  }

  override def copy(): AccumulatorV2[Nothing, Nothing] = new BigIntegerAccumulator()

  override def reset(): Unit = {
    num = BigInteger.ZERO
  }

  override def add(v: Nothing): Unit = {

  }

  override def merge(other: AccumulatorV2[Nothing, Nothing]): Unit = {
    num = num.add(other.value)
  }

  override def value(): BigInteger = {
    num
  }
}




import org.apache.spark.sql.SparkSession

class Main {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    val sc = spark.sparkContext
    // 直接new自定义的累加器
    val bigIntegerAccumulator = new BigIntegerAccumulator()
    // 然后在SparkContext上注册一下
    sc.register(bigIntegerAccumulator, "bigIntegerAccumulator")
    bigIntegerAccumulator.reset()
    rdd.repartition(6).map(element => {
      println(element)
      bigIntegerAccumulator.add(new BigInteger("1"))
    })
    rdd.count()
    // rdd.count()
  }
}

  • 重点:累加变量的最终结果应该不受累加顺序的影响,比如StringAccumulator就是一个错误的例子,就相当于开了多个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,但是最后追加出来的字符串是无法被预测的;
  • executor端只能对累加变量做累加操作,driver端只能读取累加变量的值;

3.2 使用陷进:避免重复累加

每执行一个task,就会对累加变量做一次累加操作。我们知道Spark存在两种操作,即transformation和action,前者是lazy方式执行,只有遇到action操作才会执行之前的transformation操作(当前action和上一个action之间的)。设想,如果一个application的main方法中,对一个RDD先后执行了1个transfromation操作和2个action操作的话,那么同样的transformation操作会被执行两次,这就会导致执行了2*partition个数(个)task,那么累加变量就被重复多做了“parttiion个数”次累加操作。比如:

class Main {
  def main(args: Array[String]): Unit = {
    ......
    rdd.repartition(6).map(element => {
      println(element)
      bigIntegerAccumulator.add(new BigInteger("1"))
    })
    rdd.count()
    rdd.count()
  }
}

解决方法就是:对上一次的transformation计算结果进行cache,这样的话遇到第二个count操作时就不会再做一次transformation操作了,比如:

class Main {
  def main(args: Array[String]): Unit = {
    ......
    rdd.repartition(6).map(element => {
      println(element)
      bigIntegerAccumulator.add(new BigInteger("1"))
    }).cache()
    rdd.count()
    rdd.count()
  }
}

执行第一个count时,会执行 rdd.repartition(6).map() 操作并将结果进行了cache;执行第二个count时,不会再执行rdd.repartition(6).map() 操作。


参考:
https://blog.csdn.net/qq_35866165/article/details/86671302

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343