Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。 在 Scala 中,我们可以把定义的内联函数、方法的引用或静态方法传递给 Spark,就像 Scala 的其他函数式 API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用 的数据需要是可序列化的(实现了 Java 的 Serializable 接口)。 传递一个对象的方法或者字段时,会包含对整个对象的引用
class SearchFunctions(val query: String) extends java.io.Serializable{
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
rdd.filter(isMatch)
}
def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 问题:"query"表示"this.query",因此我们要传递整个"this"
rdd.filter(x => x.contains(query))
}
def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 安全:只把我们需要的字段拿出来放入局部变量中
val query_ = this.query
rdd.filter(x => x.contains(query_))
}
}
如果在 Scala 中出现了 NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段