JDK Future接口
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
-
idDone()
方法返回 true 的情况:- 成功完成
- 取消
- 发生异常
-
get()
是阻塞方法,会等待完成。
FutureTask类
FutureTask
类实现了RunnableFuture
接口,该接口即继承了Future
接口,又继承了Runnable
接口,代表一个有返回结果的、可执行的任务。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask
类的构造函数支持Runnable
和Callable
接口的实现类,其中Runnable
实例通过工具类Executors.callable
方法转换为Callable
实例,并赋值给实例变量callable
。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
FutureTask
的实例可以提交到ExecutorService
中执行。例如:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1 + 2;
}
});
System.out.println(future.get());
}
RunableAdapter的适配器模式
看一下Executors.callable
的实现,创建了一个RunnableAdapter
实例:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter
类是工具栏Executors
的静态内部类,实现了Callable
接口定义的call
方法。它持有Runnable
类型的任务task
对象和返回结果result
:
static final class RunnableAdapter<T> implements Callable<T> {
//持有目标对象
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
call
方法的执行逻辑是调用task
对象的run
方法,然后将传入的结果result
返回。
Netty提供的Future接口
Netty的Future
接口继承了JDK的Future
接口,同时提供了更多的方法:
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException;
Future<V> await() throws InterruptedException;
V getNow();
}
任务成功完成后
isSuccess()
返回true
任务执行过程中有异常,
cause()
会返回异常对象任务被取消执行,父接口方法
isCancelled
返回true-
以上3种情况
isDone()
均为true
//任务完成 if (task.isDone()) { if (task.isSuccess()) { // 成功 } else if (task.isCancelled()) { // 被取消 } else { // 异常 System.out.print(task.cause()) } }
await
和sync
都会阻塞,并等待任务完成getNow()
不会阻塞,会立即返回,但任务尚未执行完成时,会返回null
addListener
方法在当前Future
对象中添加监听器,当任务完成时,会通知所有的监听器。
ChannelFuture接口
ChannelFuture
继承了Netty的Future
接口,代表 Netty channel
的I/O操作的执行结果。在Netty中所有的I/O操作都是异步的,会立即返回一个代表I/O操作的结果,即ChannelFuture
。
在获得执行结果时,推荐使用添加监听器,监听执行完成事件
operaionCompleted
,而不要使用await
方法。
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
//当任务完成时,会被调用
void operationComplete(F future) throws Exception;
}
不能在
ChannelHandler
中调用await
,会造成死锁。因为ChannelHandler
中的方法通常是I/O线程调用的,再调用await
会造成I/O阻塞。
//错误
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// Perform post-closure operation
// ...
}
// 正确
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}
即使是通过添加
ChannelFutureListener
的方式获取执行结果,但要注意的是:回调方法operationComplete
也是由I/O线程调用的,所以也不能在其中执行耗时任务。如必须,则启用线程池执行。
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.bind(8899)
.sync();
bind
方法是异步的,其返回值是ChannelFuture
类型。需要调用sync()
同步方法,等待绑定动作执行完成。