通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。
1.累加器
提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见的用途是在调试时对作业的执行过程中事件进行计数。
一个计算空行的例子
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by chh on 2016/5/22.
*/
object Counter {
def main(args :Array[String]): Unit = {
//创建一个scala版本的SparkContext
val conf = new SparkConf().setAppName("Counter").setMaster("local")
val sc = new SparkContext(conf)
val file=sc.textFile("file.txt")
val blankLines =sc.accumulator(0)//创建Accumulator[Int]并初始化为0
val callSigns =file.flatMap(line => {
if(line == ""){
blankLines += 1 //累加器加一
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println(blankLines.value)
}
}
注:
1.累加器的值只有在驱动器程序中访问,所以检查也应当在驱动器程序中完成。
2.对于行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此如果想要一个无论在失败还是在重新计算时候都绝对可靠的累加器,必须把它放在foreach()这样的行动操作中。
而转化操作最好在调试中使用
2.广播变量
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。
一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。
步骤
1.调用SparkContext.broadcast方法创建一个Broadcast[T]对象。
任何序列化的类型都可以这么实现。
2.通过value属性访问改对象的值(Java之中为value()方法)
3.变量只会被发送到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
修改为:.广播变量方式
3.分区操作
(1)mapPartitions
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象(如数据库连接对象),使用mapPartitions要比map高效的多。
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23
//rdd3将rdd1中每个分区中的数值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2
(2)mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
<template>
<div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>个人</h3>
<p>
适合垂直领域专家、意见领袖等自媒体人士申请
</p>
</div>
<div class="enter" @click="nextStage('个人')">
<span>入驻</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>媒体</h3>
<p>
面向报纸、杂志、电视、广播等其他以内容生产为主的组织机构
</p>
</div>
<div class="enter" @click="nextStage('媒体')">
<span>入驻</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>企业/机构</h3>
<p>
面向企业、公司、分支机构、社团、民间组织等机构团体
</p>
</div>
<div class="enter" @click="nextStage('企业/机构')">
<span>入驻</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>政府</h3>
<p>
面向国内各省市区的各级党政机关
</p>
</div>
<div class="enter" @click="nextStage('政府')">
<span>入驻</span>
</div>
</div>
</div>
</template>
<script>
export default {
data : function(){
return {
}
},
methods: {
nextStage: function(type) {
console.log(type);
window.location.href="#/registor/fill"
}
}
}
</script>
<style scoped>
.type-item {
position: relative;
box-sizing: border-box;
padding: 40px 140px 30px 150px;
border-bottom: 1px solid #cdcdcd;/*no*/
}
.type-item:nth-of-type(1) {
margin-top: 40px;
}
.type-item:nth-of-type(1) .type-image{
background-image: url('../../assets/images/registor/geren.png');
}
.type-item:nth-of-type(2) .type-image{
background-image: url('../../assets/images/registor/meiti.png');
}
.type-item:nth-of-type(3) .type-image{
background-image: url('../../assets/images/registor/qiye.png');
}
.type-item:nth-of-type(4) .type-image{
background-image: url('../../assets/images/registor/zhengfu.png');
}
.type-item:nth-last-of-type(1) {
border-bottom: 0px;
margin-bottom: 80px;
}
.type-item .type-image {
position: absolute;
left: 0px;
width: 118px;
height: 118px;
border-radius: 50%;
background-color: #fdb300;
background-repeat: no-repeat;
background-size: 55px 50px;
background-position: center;
border-bottom: 1px solid #cdcdcd;
}
.type-item .content h3 {
font-size: 34px;
line-height: 45px;
color: #666666;
margin-top: -5px;
}
.type-item .content p {
font-size:26px;
color: #999999;
margin-top: 16px;
}
.type-item .enter {
position: absolute;
box-sizing: border-box;
top: 50%;
-webkit-transform: translate(0, -50%);
transform: translate(0, -50%);
right: 0px;
width: 120px;
font-size: 26px;
color: #666666;
border-radius: 10px;
border: 2px solid #fdb300;
text-align: center;
vertical-align: center;
padding: 15px 0px;
}
</style>
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator
}
}
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)