一、引言
1. FutureTask在高并发场景下能确保任务只执行一次吗?
2. 任务还在执行的时候用户调用cancel能否让任务停止执行?
二、功能简介
FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要负责的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
与1.6版本不同,1.7的FutureTask不再基于AQS来构建,而是在内部采用简单的Treiber Stack来保存等待线程。
三、前置知识
LockSupport
LockSupport是用来创建锁及其他同步类的基本线程阻塞元素,它的park和 unpark能够分别阻塞线程和解除线程阻塞。它提供了可以指定阻塞时长的park方法。park和unpark的基本接口为:
public static void park() {
unsafe.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
Unsafe
Java不能够直接访问操作系统底层,而是通过本地方法来访问。Unsafe提供了硬件级别的原子访问,主要提供一下功能:
1. 分配释放内存
2. 定位某个字段的内存位置
3. 挂起一个线程和恢复,更多的是通过LockSupport来访问。park和unpark
4. CAS操作,比较一个对象的某个位置的内存值是否与期望值一致,一致则更新对应值,此更新是不可中断的。主要方法是compareAndSwap*
并发工具三板斧
状态,队列,CAS
四、源码分析
1. FutureTask介绍
FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。
使用FutureTask的优势有:
可以获取线程执行后的返回结果;
提供了超时控制功能。
它实现了Runnable接口和Future接口:
2. FutureTask的状态
在FutureTask中,状态是由state属性来表示的,不出所料,它是volatile类型的,确保了不同线程对它修改的可见性:
private volatile int state;
private static final int NEW =0;
private static final int COMPLETING =1;
private static final int NORMAL =2;
private static final int EXCEPTIONAL =3;
private static final int CANCELLED =4;
private static final int INTERRUPTING =5;
private static final int INTERRUPTED =6;
状态转换路径
栈结构
run方法
public void run(){
/*
* 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;
* runner是FutureTask的一个属性,用于保存执行任务的线程,
* 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。
*/
if(state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try{
Callable<V> c = callable;
// 只有初始状态才会执行
if(c !=null&& state == NEW) {
V result;
boolean ran;
try{
// 执行任务
result = c.call();
// 如果没出现异常,则说明执行成功了
ran =true;
}catch(Throwable ex) {
result =null;
ran =false;
// 设置异常
setException(ex);
}
// 如果执行成功,则设置返回结果
if(ran)
set(result);
}
}finally{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 无论是否执行成功,把runner设置为null
runner =null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
ints = state;
// 如果被中断,则说明调用的cancel(true),
// 这里要保证在cancel方法中把state设置为INTERRUPTED
// 否则可能在cancel方法中还没执行中断,造成中断的泄露
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
run方法总结
校验当前任务状态是否为NEW以及runner是否已赋值。这一步是防止任务被取消。
double-check任务状态state
执行业务逻辑,也就是c.call()方法被执行
如果业务逻辑异常,则调用setException方法将异常对象赋给outcome,并且更新state值
如果业务正常,则调用set方法将执行结果赋给outcome,并且更新state值
awaitDone方法
// 第一次循环:创建栈头结点
// 第二次循环: 入栈操作
// 第三次循环: 开始阻塞,等待通知
// 队列针对一个FutureTask示例
// 一个FutureTask示例的多个线程,依次入栈,通过next节点链接,后面再依次唤醒
private int awaitDone(boolean timed, long nanos)throws InterruptedException {
// 计算到期时间
final long deadline = timed ? System.nanoTime() + nanos :0L;
WaitNode q =null;
boolean queued =false;
for (; ; ) {
// 如果被中断,删除节点,抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任务执行完毕并且设置了最终状态或者被取消,则返回
if (s > COMPLETING) {
if (q !=null)
q.thread =null;
return s;
}
// s == COMPLETING时通过Thread.yield();让步其他线程执行,
// 主要是为了让状态改变
else if (s == COMPLETING)// cannot time out yet
Thread.yield(); // 主动让出CPU
// 创建一个WaitNode
else if (q ==null)
q =new WaitNode(); // 第一次头节点
// CAS设置栈顶节点
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 入栈操作
// 如果设置了超时,则计算是否已经到了开始设置的到期时间
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果已经到了到期时间,删除节点,返回状态
if (nanos <=0L) {
removeWaiter(q);
return state;
}
// 阻塞到到期时间
LockSupport.parkNanos(this, nanos);
}
// 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
else
LockSupport.park(this);
}
}
awaitDone方法总结
计算deadline,也就是到某个时间点后如果还没有返回结果,那么就超时了。
进入自旋,也就是死循环。
首先判断是否响应线程中断。对于线程中断的响应往往会放在线程进入阻塞之前,这里也印证了这一点。
判断state值,如果>COMPLETING表明任务已经取消或者已经执行完毕,就可以直接返回了。
如果任务还在执行,则为当前线程初始化一个等待节点WaitNode,入等待队列。这里和AQS的等待队列类似,只不过Node只关联线程,而没有状态。AQS里面的等待节点是有状态的。
计算nanos,判断是否已经超时。如果已经超时,则移除所有等待节点,直接返回state。超时的话,state的值仍然还是COMPLETING。
如果还未超时,就通过LockSupprot类提供的方法在指定时间内挂起当前线程,等待任务线程唤醒或者超时唤醒。
入栈示意图
五、总结
FutureTask是线程安全的,在多线程下任务也只会被执行一次;
注意在执行时各种状态的切换;
get方法调用时,如果任务没有结束,要阻塞当前线程,法阻塞的线程会保存在一个Treiber Stack中;
get方法超时功能如果超时未获取成功,会抛出TimeoutException;
注意在取消时的线程中断,在run方法中一定要保证结束时的状态是INTERRUPTED,否则在cancel方法中可能没有执行interrupt,造成中断的泄露。