Future 模式只是生产者-消费者模型的扩展。经典“生产者-消费者”模型中消息的生产者不关心消费者何时处理完该条消息,也不关心处理结果。Future模式则可以让消息的生产者等待直到消息处理结束,如果需要的话还可以取得处理结果。
简单来讲,Future是这样一种模式: 它本身表示‘将来(future)’,你提交一个异步的任务,比如提交到一个threadpool,与此同时拿到一个Future对象,任务的执行是异步的,这时候你可以去做其它的事情,等到异步任务结束的时候,你可通过前面的Future对象拿到异步执行的任务的结果。
一个简单的例子来直观感受一下Future:
public class AddTask implements Callable<Integer> {
private int a, b;
public AddTask(int a, int b) {
this.a = a;
this.b = b;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new AddTask(1, 2));
// 假设现在做其他事情
Thread.sleep(5000);
// 只有当future的状态是已完成时(future.isDone = true),get方法才会返回
if (future.isDone()) {
System.out.print(future.get());
}
}
@Override
public Integer call() throws Exception {
return a + b;
}
}
Future要获取异步任务执行的结果,需要通过轮询或者阻塞等待的方式,这样的方式,总显得不太‘完美’,比较好的做法,应该是异步执行结束后,去通知用户异步任务结束了,你可以通过Future来获取执行结果了。
ListenableFuture
Google Guava 包定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口。它可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。
创建
对应JDK中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了ListeningExecutorService 接口, 该接口返回 ListenableFuture。
将 ExecutorService 转为 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
final ListenableFuture< String> future = executor.submit(
new Callable< String>() {
public String call() throws Exception {
return "Hello ListenableFuture";
}
});
方法
ListenableFuture 中的基础方法是addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的Runnable参数传入的对象会被指定的Executor执行。
future.addListener(new Runnable() {
public void run() {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
future.cancel(true);
} catch (ExecutionException e) {
future.cancel(true);
}
}
}, Executors.newFixedThreadPool(1));
使用addListener时,用户需要在注册的回调函数中处理InterruptedException和ExecutionException, 显得略为麻烦。这里Guava还提供了另为一种使用方式:
添加回调(Callbacks)
显式的提供了用户线程池,用来执行回调函数。
Futures.addCallback(future, new FutureCallback< String>() {
public void onSuccess(String result) {
System.out.println(result);
}
public void onFailure(Throwable t) {
System.out.println("onFailure: " + t);
}
}, Executors.newFixedThreadPool(1));
也可以使用默认方式,默认是采用 MoreExecutors.sameThreadExecutor()线程池
Futures.addCallback(ListenableFuture<V> future,FutureCallback<? super V> callback)
FutureCallback<V> 中实现了两个方法:
onSuccess(V),在Future成功的时候执行,根据Future结果来判断。
onFailure(Throwable), 在Future失败的时候执行,根据Future结果来判断。