前言
上篇文章已经讲到了LockSupport提供的功能,以及如何使用LockSupport实现锁的语义,本文将介绍Future的语义以及如何利用LockSupport实现Future。
Future语义
大概应该是从Java1.5开始引入了Future接口,其他很多框架或者工具包都或多或少的引入了Future或者Promise(其他很多语言也引入了Future或者Promise的类似语义)。Future本身体现的可能是一种异步的思想。大体可以描述成“在将来的某个时候可以获取某个结果,可以获取任务的完成状态”。Future的接口定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
接口的语义描述如下:
- boolean cancel(boolean mayInterruptIfRunning); 取消该任务,mayInterruptIfRunning表示的是如何处理该任务的线程还在执行,会尝试interrupt该线程(Java里的thread.interrupt只是会将interrupted状态置位)
- boolean isCancelled(); 任务是否已经被取消
- V get() throws InterruptedException, ExecutionException; 该方法是个阻塞方法,线程阻塞到任务完成或者取消(可以多个线程进行阻塞等待该任务完成),如果等待的线程被Interrupt了会抛出InterruptedException异常,如果task执行异常,会抛出ExecutionException
- V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException; 该方法是个阻塞方法,线程阻塞一定时间等待任务完成,如果任务没完成将抛出TimeoutException, 如果等待的线程被Interrupt了会抛出InterruptedException异常,如果task执行异常,会抛出ExecutionException
如何实现Future
类似于锁一样,由于Future语义中包含了多个线程同时可以调用get()方法进行阻塞等待,所以Future中需要维护一个等待线程的队列。单纯针对于Future接口来说,其中并没有指定set方法,所以Future实现中需要提供set()方法来描述任务的处理完成(可能是由于出现异常提前完成),而且Future中还需要维护是否被取消,是否完成等状态。
对于java1.5以后来说,JUC中已经提供了FutureTask的实现,如FutureTask名字描述的一样,FutureTask是一个拥有Future特性的task,该task是一个可执行的Runnable对象,所以FutureTask天然知道自己什么时候被处理完成(Runnable执行完成或者出现异常),所以FutureTask不需要透出任何set形式的方法,而且FutureTask天然了解自己是被哪个线程所执行,所以当执行cancel(boolean mayInterruptIfRunning)方法时可以中断这个执行线程。
下文中,基于LockSupport实现了一个简单的Future,该Future不是一个Runnable,只是一个helper,可以用来给某些class加上Future语义。例如在RPC实现中通常会有
ResponseFuture request(Request request) throws RpcException这种方法,这其中的ResponseFuture就可以借助以下的Future实现Future语义。
public class FutureUsingLockSupport<V> implements Future<V> {
private final AtomicInteger state = new AtomicInteger(NEW);
public static final int NEW = 0;
public static final int COMPLETED = 1;
public static final int CANCELLED = 3;
public static final int EXCEPTIONAL = 2;
private final ConcurrentLinkedQueue<Thread> waitThreads = new ConcurrentLinkedQueue<Thread>();
private volatile Object result;//may be Throwable
//取消的语义,没相应中断
public boolean cancel(boolean mayInterruptIfRunning) {
int s = state.get();
if (!(s == NEW && state.compareAndSet(NEW, CANCELLED))) {
return false;
} else {
//这里本该需要interrupt运行的task,但是此场景下没法获知Running的Thread
finish();
}
return true;
}
//是否被取消
public boolean isCancelled() {
return (state.get() >= CANCELLED);
}
//是否完成
public boolean isDone() {
return (state.get() > NEW);
}
//可能阻塞,处理InterruptedException
public V get() throws InterruptedException, ExecutionException {
int s = state.get();
if (s < COMPLETED) {
s = awaitDone(false, 0L);
}
return report(s);
}
//可能阻塞及超时,处理InterruptedException
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
int s = state.get();
if (s < COMPLETED && (s = awaitDone(true, unit.toNanos(timeout))) < COMPLETED ) {
throw new TimeoutException();
}
return report(s);
}
public void set(V value) {
if (state.compareAndSet(NEW, COMPLETED)) {
//
result = value;
finish();
}
}
private void finish() {
//记性unpark操作
if (!waitThreads.isEmpty()) {
for (Thread thread : waitThreads) {
LockSupport.unpark(thread);
}
}
}
public void setException(Throwable throwable) {
if (state.compareAndSet(NEW, EXCEPTIONAL)) {
result = throwable;
finish();
}
}
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
if (s == COMPLETED) {
return (V) result;
}
if (s >= CANCELLED) {
throw new CancellationException();
}
throw new ExecutionException((Throwable) result);
}
//等待一定时间
private int awaitDone(boolean timeout, long nanos) throws InterruptedException {
Thread currentThread = Thread.currentThread();
long deadLine = timeout ? nanos + System.nanoTime() : 0L;
boolean added = waitThreads.contains(currentThread);
while (true) {
//首先检查是否interrupt()
if (Thread.interrupted()) {
waitThreads.remove(currentThread);
throw new InterruptedException();
}
if (state.get() >= COMPLETED) {
//已经完成
return state.get();
} else if (!added) {
added = waitThreads.add(currentThread);
} else if (timeout) {
long left = deadLine - System.nanoTime();
if (left <= 0L) {
//超时
waitThreads.remove(currentThread);
return state.get();
}
LockSupport.parkNanos(left);
} else {
LockSupport.park();
}
}
}
}
总结
本文利用LockSupport实现了一个简单的Future,可以作为Helper类帮助其他类完成Future语义。