1. 共同点
异步编程: 这三种机制都用于处理异步操作,即在不阻塞当前线程的情况下执行任务,并在任务完成后通知结果。
-
结果获取: 所有三种机制都提供获取异步操作结果的方法,例如:
Future.get()
CompletionStage.toCompletableFuture().join()
CompletableFuture.join()
-
错误处理: 所有三种机制都提供处理异步操作中发生的异常的方法,例如:
-
Future.get()
可能会抛出异常。 -
CompletionStage.exceptionally()
和CompletableFuture.exceptionally()
可以捕获异常。
-
2. 优势
Future
-
简单易用:
Future
是 Java 1.5 中引入的,相对来说比较简单,易于理解。 -
基础机制:
Future
是异步操作的核心,其他的机制,例如CompletionStage
和CompletableFuture
都是基于Future
实现的。
CompletionStage
-
灵活的组合和处理:
CompletionStage
提供了丰富的组合方法,例如thenApply
、thenAccept
、thenCompose
等,可以方便地将多个异步操作进行组合和处理,以满足复杂业务需求。 -
链式调用:
CompletionStage
支持链式调用,可以将多个异步操作连接起来,形成一个完整的异步操作流程,简化代码编写。
CompletableFuture
-
功能更丰富:
CompletableFuture
提供了更加丰富的方法,除了CompletionStage
接口中定义的方法之外,还提供了complete
、completeExceptionally
、join
等方法,可以更加灵活地控制异步操作。 -
高效:
CompletableFuture
的实现更加高效,因为它使用了一些优化技术,例如延迟初始化和缓存。
3. 示例代码
3.1、Future
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(1000);
return "Hello, Future!";
});
// 获取结果
String result = future.get();
System.out.println("Result: " + result);
executor.shutdown();
}
}
3.2、CompletionStage
import java.util.concurrent.CompletableFuture;
public class CompletionStageExample {
public static void main(String[] args) {
CompletionStage<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletionStage!";
});
future.thenApply(s -> s.toUpperCase())
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Error occurred: " + throwable.getMessage());
return null;
});
}
}
进阶用法
3.3、、将两个 CompletionStage 的结果合并在一起
我们以一个常见的场景 — 网络请求为例,来说明 CompletionStage 的链式调用如何简化代码:
场景描述:
假设你需要从两个不同的 API 获取数据,然后将这两个数据进行合并处理,最后将合并后的数据展示给用户。
传统方法:
// 使用传统的回调方式
public void fetchAndProcessData() {
// 第一个 API 请求
api1.fetch(new Callback<Data1>() {
@Override
public void onSuccess(Data1 data1) {
// 第二个 API 请求
api2.fetch(new Callback<Data2>() {
@Override
public void onSuccess(Data2 data2) {
// 合并数据
ProcessedData processedData = processData(data1, data2);
// 展示结果
showResult(processedData);
}
@Override
public void onFailure(Throwable throwable) {
// 处理错误
handleError(throwable);
}
});
}
@Override
public void onFailure(Throwable throwable) {
// 处理错误
handleError(throwable);
}
});
}
使用 CompletionStage 的方法:
// 使用 CompletionStage 进行链式调用
public void fetchAndProcessData() {
CompletionStage<Data1> data1Future = api1.fetch();
CompletionStage<Data2> data2Future = api2.fetch();
data1Future.thenCombine(data2Future, this::processData) // 合并数据
.thenAccept(this::showResult) // 展示结果
.exceptionally(this::handleError); // 处理错误
}
private ProcessedData processData(Data1 data1, Data2 data2) {
// 合并数据逻辑
// ...
return processedData;
}
private void showResult(ProcessedData processedData) {
// 展示结果逻辑
// ...
}
private Throwable handleError(Throwable throwable) {
// 处理错误逻辑
// ...
return throwable;
}
3.4、将 CompletionStage 的结果作为另一个 CompletionStage 的输入
示例:
假设我们有一个数据库查询操作,需要先获取用户 ID,然后再根据用户 ID 获取用户信息。我们可以使用 CompletionStage 的 thenCompose 方法来实现这个操作:
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class CompletionStageChain {
public static void main(String[] args) {
// Step 1: 获取用户信息
CompletableFuture<String> user = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户A";
});
// Step 2: 使用用户信息获取订单列表
Function<String, CompletableFuture<String>> getOrders = userStr -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "订单列表:" + userStr + "的订单信息";
});
// Step 3: 将用户信息和订单列表拼接在一起
Function<String, String> joinUserAndOrders = orders -> "用户信息:" + user.join() + "\n" + orders;
// 将多个 CompletionStage 连接起来
user.thenCompose(getOrders)
.thenApply(joinUserAndOrders)
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Error occurred: " + throwable.getMessage());
return null;
});
}
}
解释:
- 在上面的示例中,我们使用
thenApply
将user
的结果作为getOrders
函数的输入,从而获得订单列表
。 - 接着,我们使用
thenApply
将订单列表
作为joinUserAndOrders
函数的输入,从而将用户信息和订单列表拼接在一起。 - 最后,我们使用
thenAccept
将最终结果输出到控制台。
简化代码
- 使用
CompletionStage
的链式调用,我们可以将多个异步操作连接起来,形成一个完整的流程,代码更加简洁易读。 - 相比于手动管理多个
Future
对象,CompletionStage
的链式调用可以更方便地处理异步操作的组合和处理,提高代码的可读性和可维护性。
4.执行效率
细心者会发现,上面经常用到CompletableFuture.supplyAsync 方法来开始一个异步任务,也恰恰就是在supplyAsync中,会使用一个异步线程来执行传入的 Supplier 函数。但是,这个异步线程并非由 CompletableFuture 自己创建,而是来自于 Java 中的 ForkJoinPool。
ForkJoinPool
定义: ForkJoinPool 是 Java 提供的一个线程池,专门设计用于执行可分解为更小任务的并行任务。
工作原理: ForkJoinPool 使用了一种分治的思想,将一个任务分解为多个子任务,并使用多个线程并行执行这些子任务,最终将结果合并,实现高效的并行计算。
使用场景: ForkJoinPool 适用于处理那些可以分解为更小任务的任务,例如:
- 并行计算
- 海量数据处理
- 递归算法
CompletableFuture.supplyAsync 和 ForkJoinPool
CompletableFuture.supplyAsync 方法默认使用 ForkJoinPool.commonPool() 来执行异步任务。
ForkJoinPool.commonPool() 是一个共享的 ForkJoinPool,它由所有使用 ForkJoinPool 的类共享使用。
如果你需要更精细的控制,你也可以通过 CompletableFuture.supplyAsync(supplier, executor) 方法指定一个自定义的线程池来执行异步任务。
5. 总结
-
Future
是一个基础的异步操作接口,简单易用。 -
CompletionStage
提供了更加灵活的异步操作组合和处理方法,支持链式调用,简化代码编写。 -
CompletableFuture
在CompletionStage
的基础上提供了更丰富的功能,更方便的控制和操作,并且更加高效。
根据实际需求选择合适的异步操作机制,可以有效提高程序的效率和可维护性。