学习博主的文章《Spark driver端得到executor返回值的方法》之后有感
文章中通过阅读count方法的源码,count方法将task的返回值返回到driver端,然后进行聚合,源码如下
defcount(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
sparkcontext的runJob方法,Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数:
/**
* Counts the number of elements of an iterator using a while loop rather than calling
* [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower
* in the current version of Scala.
*/
def getIteratorSize(iterator:Iterator[_]):Long = {
var count =0L
while (iterator.hasNext) {
count +=1L
iterator.next()
}
count
}
返回结果为各个分区的元素个数,使用sum方法进行统计。
博主的文章中使用实际案例代码说明了使用方法,学习后有如下启发,在driver端不止可以获取每个task中的数据量,是否还可以获取具体的变量值呢?做如下测试:
val rdd = sc.parallelize(1 to 10 ,3)
import org.apache.spark.TaskContext
import scala.collection.mutable._
val funccc = (itr : Iterator[Int]) => {
val lst = new ListBuffer[Int]
itr.foreach(each=>{
lst.append(each)
})
(TaskContext.getPartitionId(),lst)
}
val res = sc.runJob(rdd,funccc)
在driver端查看获取的结果
res(1)._2.foreach(println)
4
5
6