spark(spark-2.10)算子(如map、filter等)的源码实现,都会调用ClosureCleaner.clean对传入的function进行检查和清理。其中有一步检查是,如果function包含了return,则直接失败。有关代码如下:
// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
比如说,写下如下代码:
val rdd = sc.parallelize(1 to 10)
println(rdd.map(x => return x * 2).collect())
你会发现运行时抛出如下异常:
Exception in thread "main" org.apache.spark.util.ReturnStatementInClosureException: Return statements aren't allowed in Spark closures
可是为什么spark会做这个检查呢?这和scala中对return的实现有关。
先来聊一聊anonymous function中的return
scala中的return和java中的return,是不一样的。在scala的anonymous function(后面也简写为anonymous func)中使用return时,代码并不像看起来的那样。
先看一段代码:
def main(args: Array[String]) {
def d1(n:Int): Int = n * 2 //定义d1,将输入值乘以2,未使用return
def d2(n:Int): Int = return n * 2 //定义d2,将输入值乘以2,使用了return
val list = List(1, 2, 3)
println(list.map(d1)) //调用d1将list中的值都乘以2
println(list.map(d2)) //调用d2将list中的值都乘以2
println(list.map(x => x * 2)) //使用anonymous func将list中的值乘以2,没return
println(list.map(x => return x * 2)) //使用anonymous func将list中的值都乘以2,有return
}
输出如下:
List(2, 4, 6)
List(2, 4, 6)
List(2, 4, 6)
只输出了三个预期结果,预期的第四个结果不见了。
这是为什么呢?
anonymous func中return的实现方式:NonLocalReturnException
scala是用抛出异常的方式来实现anonymous func中的return的,Scala Language Specification(下文简称SLS) 6.20 Return Expressions中的原文如下:
Returning from a nested anonymous function is implemented by throwing and catching a scala.runtime.NonLocalReturnException.
将上一节中的代码编译后生成的class文件,反编译成java代码,可以看到:
- 对于具名方法,如d1和d2,无论是否显式的用了return,在反编译出来的java代码中,都是用java return实现的。下面以d1的反编译代码为例:
//println(list.map(d1))
//创建了一个anonymous function,并在其apply方法中调用d1
Predef..MODULE$.println(list.map(new AbstractFunction1.mcII.sp() {
public static
final long serialVersionUID = 0L;
public int apply$mcII$sp(int n) {
return TestMain..MODULE$.com$iflytek$gnome$data$tmpsupport$main$TestMain$$d1$1(n);
}
public
final int apply (int n) {
return apply$mcII$sp(n);
}
}, List..MODULE$.canBuildFrom()));
//方法d1,仍是java的return
public
final int com$iflytek$gnome$data$tmpsupport$main$TestMain$$d1$1 (int n) {
return n * 2;
}
- 对于未包含return的anonymous function,在反编译后的代码中,则也是创建了一个anonymous func,并使用了java的return。
//创建了anonymous func,并在其apply方法中直接处理x * 2的逻辑。
Predef..MODULE$.println(list.map(new AbstractFunction1.mcII.sp()
{
public static final long serialVersionUID = 0L;
public int apply$mcII$sp(int x)
{
return x * 2;
}
public final int apply(int x)
{
return apply$mcII$sp(x);
}
}, List..MODULE$.canBuildFrom()));
- 对于包含return的anonymous function,则是抛出了异常。
Predef..MODULE$.println(list.map(new AbstractFunction1()
{
public static final long serialVersionUID = 0L;
private final Object nonLocalReturnKey1$1;
public final Nothing. apply(int x)
{
//这里有个x * 2,但是没有作为返回值,后面紧接着抛出异常了。
(x * 2);throw new NonLocalReturnControl.mcV.sp(this.nonLocalReturnKey1$1, BoxedUnit.UNIT);
}
}, List..MODULE$.canBuildFrom()));
执行流程并不如我们所想,代入1执行返回后,再代入2执行。而是在代入1后,就直接抛出了异常,这就是为什么最后一句println没有输出。其实,我们可以试着在最后一句println之后再加几句println("test")之类的,会发现也打印不出来。
到这里,我们知道了 ,anonymous function中的return是以NonLocalReturnException实现的。可,啥是non-local return?
non-local return
引用内容及有关代码来自知乎-怎么理解non-local return。
Non-local return是non-local control-flow的一种。
这里的non-local指的是:控制流并不转移到当前函数内的某个地方(return的话,并不从当前函数返回到其直接调用者),而是转移到更外层的调用者去。举例:
- local return
function f() {
g()
return
}
function g() {
return
}
这样从f()调用g(),g()里的return就属于local return——f()是g()的直接调用者,g()里的return将控制流转移到其直接调用者处。
- non-local return
function a() {
b() // non-local return if returns to after this call
return
}
function b() {
c(function () { //类似map(x => return x * 2)
return // <- the return in question
})
return
}
这个例子里
- 如果b()里的匿名函数出现的return在执行后是返回到c()里,那就是local return。
- 如果b()里的匿名函数出现的return在执行后是返回到a()里,那就是non-local return。
我们再来回顾一下前文中,在anonymous func中使用return的代码:
def main(args: Array[String]) {
val list = List(1, 2, 3)
println(list.map(x => return x * 2)) //使用anonymous func将list中的值都乘以2,有return
}
在反编译后的java代码中可以看到,异常打断了println,直接返回到main中。异常之后会被main的catch捕捉到。代码的大概流程是下面这个样子:
public void main(String[] args) {
Predef..MODULE$.println(list.map(new AbstractFunction1() {
...
throw new NonLocalReturnControl.mcV.sp(...);
...
}
catch (NonLocalReturnControl localNonLocalReturnControl) {}
也就是说,scala采用抛出异常的方式,实现了non-local return。
我们再从SLS来理解下这个non-local return。
SLS 6.20 Return Expressions说,一个return表达式,必须发生在一个named method里。可anonymous function明显是没名字的嘛。
你说等等,反编译后的java版本的anonymous function明明有个apply方法啊。可SLS还说了,这个apply方法是不算数的。所以,还是要往上层找啊。直到找到了main,因为main,是离这个return最近的named method(下文中将这个named method称为return的innermost enclosing method)。也就是说,return要直接把main中止掉,也就造就了non-local return。
可是,怎么中止啊?java的return(这里强调是java中的关键字return)是不行的,不过Exception很好使啊。所以,anonymous function中的return,反编译到java后,就成了异常了。
SLS中有关原文如下:
A return expression return e must occur inside the body of some enclosing named method or function.
An apply method which is generated by the compiler as an expansion of an anonymous function does not count as a named function in the source program, and therefore is never the target of a return expression.
原文中是named method or function,而我阐述时,只取了named method,而且我在全文中使用method和function比较多,而不是“方法”和“函数”,具体原因可参见文末function VS method一节。
找到return的innermost enclosing method
在上文的例子中,return的innermost enclosing method不就是main么?为啥这里还要“找到”?
再看一遍这个代码:
def main(args: Array[String]) {
val list = List(1, 2, 3)
println(list.map(x => return x * 2)) //使用anonymous func将list中的值乘以2,有return
}
我钻了个牛角尖。离return更近的,不是还有map和println嘛?它们都是named method啊?为啥它们不是return的innermost enclosing method呢?
- map和println是第三方的lib,class文件都是编译好的,再重新编译它们的class文件,将Exception加进去么?匪夷所思了。
- 这里的innermost enclosing method,是在definition层面的enclosing,而非execution层面的enclosing。
type of innermost enclosing method
我还想聊一聊,有关这个main的返回值类型问题。SLS说,这个innermost enclosing method的返回值类型,必须和return e中的e保持一致。不过有时候,e可能会被忽略,return e会被直接当成return ()。
我们来看几个例子:
- return e 被 当成 return()。直接拿上文的例子。return的是整数,不过main的返回值是unit,这种情况下,return x * 2就被当成return()了。
def main(args: Array[String]) {
val list = List(1, 2, 3)
println(list.map(x => return x * 2)) //使用anonymous func将list中的值乘以2,有return
}
- 明确定义main的返回值类型为Int。这样编译也能通过,不过因为要保证所有的分支都返回整型,所以最后加了个1(不过这段代码run不起来,因为最外面的那个main返回必须是unit(void),才能run起来)。
def main(args: Array[String]): Int = {
val list = List(1, 2, 3)
println(list.map(x => return x * 2)) //使用anonymous func将list中的值乘以2,有return
1
}
- 明确定义main的返回值类型为String。这下编译就出错了。
def main(args: Array[String]): String = {
val list = List(1, 2, 3)
println(list.map(x => return x * 2)) //使用anonymous func将list中的值乘以2,有return
1
}
spark失败在return的原因
说了这么多,最开始的问题,虽没有明确回答,却已现端倪。
猜测1
像rdd.map(return x => x * 2)这种东西,若真能执行的话,那么在代入第一条记录后,就会直接退出整批记录的处理了,明显与用户期望相去甚远啊。干脆毙了你。
在看猜测2之前,我们又得引用一下SLS了。它说,scala会确保这个异常一定是被return的innermost enclosing method捕捉到,如果不是,就会往外面传播。可有的时候,return执行时(也就是抛异常时),这个innermost enclosing method说不定已经先执行完了。那么,这个异常就只能一直往外抛了,直到把程序抛跪。
原文如下:
Returning from a nested anonymous function is implemented by throwing and catching a scala.runtime.NonLocalReturnException. Any exception catches between the point of return and the enclosing methods might see the exception. A key comparison makes sure that these exceptions are only caught by the method instance which is terminated by the return.
If the return expression is itself part of an anonymous function, it is possible that the enclosing instance of f has already returned before the return expression is executed. In that case, the thrown scala.runtime.NonLocalReturnException will not be caught, and will propagate up the call stack.
一个参考stackoverflow-Is non-local return in Scala new?的例子。
- 异常被成功捕获
执行下面的代码,安安静静就结束了。程序第一次调用g()的时候就退出了,根本没后面代码的事情。此时,return的innermost enclosing method是main,main捕获到异常后,一看,“啊,是我的异常”,然后就把异常默默吞掉了,然后就世界和平了。
def main(args: Array[String]) {
var g: () => Unit = () => return
g() //执行到这,程序就退出了
def f() { g = () => return }
f() // set g
g() // scala.runtime.NonLocalReturnControl$mcI$sp
}
- 异常抛出来了
把1中的代码的第一个g()去掉,如下。执行后你会发现,console里躺着一句:Exception in thread "main" scala.runtime.NonLocalReturnControl$mcV$sp
。
def main(args: Array[String]) {
var g: () => Unit = () => return //第一个anonymous function
def f() { g = () => return } //第二个anonymous function
f() // set g
g() // scala.runtime.NonLocalReturnControl$mcI$sp
}
其实,这段代码里有两个anonymous function,别看两个长得一模一样,看反编译后的代码就知道了,它们可不是同一个。而g呢?不过是引用了anonymous function的一个variable而已。
当程序最后调用g()的时候,执行是第二个anonymous function中的return。
问题来了。这第二个anonymous function中的return的inner most enclosing method是谁?是f。可f已经跑完了(f重新设置了g,却没有调用g)。此时,异常被抛给了main,main一看,“哎呀,这不是我的异常,继续扔吧”。然后,异常就躺在console里了。
猜测2
所以说,return还会导致另一个问题,编译的时候好好的,运行时却会抛出致命的异常(致命的意思就是把我们的程序搞跪了)。
而spark算子里的function实际都要分发到executor上才会执行。那会儿,在definition阶段给return找的innermost enclosing method,早不知去哪了吧。
不管怎样,到这里,也算是对本文开头提出的疑问,有一个交代了。
最后,聊一聊scala里的function和method。
function VS method
function:函数。
method:方法。
我们在用这两个词的时候,尤其是在java里,大部分情况下没啥区别。在SLS里,有时候,这两个词,用的也挺混的。
嗯,下面的内容又是参考的stackoverflow-Difference between method and function in Scala。
先来了解几个SLS中的概念:
- A Function Type is (roughly) a type of the form (T1, ..., Tn) => U.
- An Anonymous Function is an instance of a Function Type.
- A method type is a
def
declaration - everything about adef
except its body. - A method value actually has a Function Type
method就像是java中类的方法,而function则更倾向于用来指代一个object(类似java中的class)。This object has an apply
method which receives N parameters of types T1, T2, ..., TN, and returns something of type R.
我想,可没有所谓的named function。对了,上文中有这样的代码:
var g: () => Unit = () => return
这里的g,只是个Function Type的variable而已,可不是什么function。