Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。
1、广播变量的引入:
(1)没有引入广播变量前的情形:
Spark 会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但也很低效。原因有二:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是 Spark 会为每个操作分别发送。
用一段代码来更直观的解释:
上述代码中:
list是在driver端创建的,但是因为需要在excutor端执行task,所以driver会把list以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个list,如果这个list非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。上述代码的filter算子每处理一个line,就会发送给executor一个task(包含一个list)。
(2)引入广播变量:
将list设置为广播变量,当executor端用到这个list时,在每个executor端会有一个叫blockManager的对象来管理被声明为广播变量。 下次driver再发送task后,就不需要再携带list了。因为此时task到达executor,首先去blockManager中查找是否有执行本次task需要的广播变量。如果存在,直接拿来用。但是广播变量在executor端是不可以被改变的。 综上:
- 不能将RDD广播除去,可以将RDD的结果广播出去,例如rdd.collect()
- 广播变量只能在drvier端进行定义,在executor端使用,不能在executor中改变广播变量的值(如果在程序中改变的话,直接报错哦)。
具体的使用:
[scala代码]
具体的使用
[java代码]
2、累加器
(1)问题抛出:
那么有没有一种功能,可以让driver统计所有executor执行变量的累加和呢?即让driver打印变量的值为100呢?我们的累加器可以实现。
(2)累加器的使用
[scala代码]
[java代码]
关于累加器需要注意的是:
(1)如下图注释所示,在executor中打印累加器,spark1.6和spark2.3还是有区别的。
(2)累加器并须在driver端进行定义。(不能在executor端进行定义,需要使用sc进行定义)
(3)在1.6版本直接使用sc.accumulator(0),直接传一个初始值(这里是0)就可以了。但是在2.3版本中有了区别:
另外就是两个版本中自定义累加器也有比较大的变化。