结论
今天先说结论吧:
- execute:1.只能提交Runnable接口的对象,我们知道Runnable接口的run方法是没有返回值的。2.execute方法提交的任务异常是直接抛出的
- submit: 1.可以提交Runnable也可以提交Callable对象,即可以有返回值,也可以没有。2.submit方法是是捕获了异常的,当调用FutureTask的get方法时,才会抛出异常。
例子还是在github中
先对比api
//只接受Runnable对象没有返回值
public void execute(Runnable command)
//接受Runnable对象并返回Future对象,调用Future的get时,正常完成后,返回null
public Future<?> submit(Runnable task)
//接受Runnable对象并返回Future对象,调用Future的get时,正常完成后,返回指定的result对象
public <T> Future<T> submit(Runnable task, T result)
//接受Callable对象并返回Future对象,调用Future的get时,正常完成后,返回task的结果
public <T> Future<T> submit(Callable<T> task)
execute方法上篇文章已经分析过了,这篇我们主要看下submit方法的实现,submit三个方法实现类似,我们用其中一个举例。
public Future<?> submit(Runnable task) {
//task对象不能为空
if (task == null) throw new NullPointerException();
//将task构造成一个RunnableFuture对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
//调用execute方法执行RunnableFuture对象
execute(ftask);
//返回future对象
return ftask;
}
可以看到,submit复用了execute方法,submit仅仅将传入的Runnable或者Callable构造成一个RunnableFuture,我们一起看看newTaskFor方法
//构造FutureTask返回
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
可以看到主要的不同,就是这个FutureTask,我们看看FutureTask吧。
public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V>
通过上面可以看到FutureTask是实现了Runnable和Future两个接口,我们知道Future接口提供了cancel,isCancelled,isDone,get方法,用于获取子线程执行的结果。
FutureTask类结构
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 可能出现的状态路径
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
//任务执行的状态
private volatile int state;
private static final int NEW = 0; //初始化状态
private static final int COMPLETING = 1; //任务正常结束或者抛出异常时的中间态 这是一个过渡状态
private static final int NORMAL = 2; //任务正常结束
private static final int EXCEPTIONAL = 3; //任务异常结束
private static final int CANCELLED = 4; //任务被取消, 注意任务只有在new状态的时候,才可以被取消
private static final int INTERRUPTING = 5; //任务到中断状态的中间态
private static final int INTERRUPTED = 6; //任务被中断
//执行任务的callable对象,可以看到传入的runnable也会被构造成callable
private Callable<V> callable;
//正常返回时的结果或者任务抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
//运行callable的线程
private volatile Thread runner;
//等待的线程节点,调用get方法时,如果线程没有运行结束,需要进入等待队列进行等待
private volatile WaitNode waiters;
。。。
}
我们上面看到ThreadPoolExecutor调用的是传入Runnable的构造方法。
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
可以看到构造方法中将传入的Runnable对象,适配成了RunnableAdapter对象,另外将state设置成NEW的状态。
通过上篇的分析,我们知道execute最终执行的是Runnable的run方法,在这里即FutureTask的run方法。
public void run() {
//状态不为初始状态或者CAS替换runner为本线程失败,都直接返回,说明任务被别人处理了
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//将runner设置为自己后,任务被自己抢到了
//任务不为空且是初始状态
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用Callable对象的call方法
result = c.call();
//正常执行完成
ran = true;
} catch (Throwable ex) {
//Callable对象的call方法抛出了异常
result = null;
ran = false;
//将异常对象放到outcome属性中,标记任务为异常状态,唤醒等待的线程
//这里可以看到对异常进行了捕获,也就是文章开头的结论2
setException(ex);
}
//如果Callable对象的call方法正常执行完成
//将正常结果放到outcome属性中,标记任务为正常完成的状态,唤醒等待的线程
if (ran)
set(result);
}
} finally {
runner = null;
// 再次处理INTERRUPTING状态
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
setException 和set方法是两种不同情况的结束,但代码类似,我们看看正常结束的set方法
protected void set(V v) {
//标记state为COMPLETING状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将正常结果赋值给outcome
outcome = v;
//标记状态为NORMAL(正常结束)
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//唤醒等待线程
finishCompletion();
}
}
finishCompletion方法
private void finishCompletion() {
// assert state > COMPLETING;
//循环唤起全部的等待线程,后面会看到其他线程调用get方法,如果任务还没完成会进入等待队列
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
前面我们看到set方法或者setException最后都会进行唤醒挂起的线程,现在就一起看看get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果任务的状态为未完成的状态,进行等待
//注意:这里的等待线程和执行任务的线程不是同一个哦,可以想象主线程new 线程池执行任务,一般都是在主线程中获取任务的结果,就是调用的get方法
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//任务完成以后,根据任务的状态,返回正常的执行结果,或者
return report(s);
}
awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//响应中断的等待
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果任务已经完成直接返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 任务在中间态,说明应该很快就能出结果,做一个让步,就不在挂起了
else if (s == COMPLETING)
Thread.yield();
else if (q == null) //构造一个等待节点
q = new WaitNode();
else if (!queued)
//将等待节点入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//如果是超时等待,判断是否已经超时,超时了直接返回
//没超时,进行剩余时间的挂起(有超时时间的挂起是可以自唤醒的)
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
//挂起自己, 这里就和finishCompletion方法里的LockSupport.unpark对上了
LockSupport.park(this);
//注意:外层无限循环的操作,所以串起来就是构造一个等待节点,再次进入for循环,将节点进入等待队列,再次进入for循环挂起自己。
//当被唤醒后,再次进入for循环,判断state> COMPLETING, 方法返回
}
}