一、前述
Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。
累机器相当于统筹大变量,常用于计数,统计。
二、具体原理
1、广播变量
- 广播变量理解图
- 注意事项
1、能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
2、 广播变量只能在Driver端定义,不能在Executor端定义。
3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello","world")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("input/*")
lineRDD.filter { x => {println(broadCast.value);broadCast.value.contains(x)} }.collect().foreach { println}
sc.stop()
2、累加器
- 累加器理解图
Scala代码:
有问题
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("sortDemo")
val sc = new SparkContext(conf)
var accumulator = 0
sc.textFile("input/*",2).foreach {//两个变量
x =>{accumulator += x.toInt
println(accumulator)}}
println(accumulator)
sc.stop()
}
}
正确
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./records.txt",2).foreach {//两个变量
x =>{accumulator.add(1)
println(accumulator)}}
println(accumulator.value)
sc.stop()
}
}
结果:
package com.neusoft
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ttc on 2018/10/17.
*/
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("/root/words.txt",10)
val sum = sc.collectionAccumulator[String]
rdd.map(x=>{
sum.add(x)
}).collect()
println("sum is " + sum.value)
sc.stop()
}
}
注意事项
累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。