我们这一篇介绍一下java8中的异步编程,其实在java5的时候,Future就已经被引入了,下面我们用一个例子说明下Future的简单使用,
public class FutureExample {
public static void main(String[] arg) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = new ThreadPoolExecutor(2,10,30, TimeUnit.MINUTES,new ArrayBlockingQueue<>(10));
Future<Integer> result = executorService.submit(() -> doSomething());
System.out.println(result.get(10,TimeUnit.SECONDS));
}
public static Integer doSomething() throws InterruptedException {
TimeUnit.SECONDS.sleep(7);//模拟时间占用
return 666;
}}
这边我们使用了future来异步获得返回值,那我们考虑一下,如果我们现在有多个任务,任务之间还存在依赖关系的话,那么我们采用Future实现,代码是怎么样的呢,我们难免要使用Future提供给我们的isDone来判断,这样代码就会变得不简洁,接下来我们就引入一个新的类,CompleteFuture 来解决future不能解决的复杂需求.
我们这边假设这样一个场景,收银员使用一套系统来获取输入的水果的打折之后的价格,按照一般的代码写法:
public void delay() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
}
/**
* get the product price in synchronized way
*
* @param product
* @return
* @throws InterruptedException
*/
public double getPrice(String product) throws InterruptedException {
return calculatePrice(product);
}
/**
* calculate the price.
*
* @param price
* @return
* @throws InterruptedException
*/
public double calculatePrice(String price) throws InterruptedException {
delay();//模拟数据库等操作
return new Random(100).nextDouble() * 100;//模拟价格
}
/**
* get the latest discount info
*
* @param product
* @return
*/
public double getLatestDiscountInfo(String product) throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
return new Random(100).nextDouble();
}
那我们如何使用CompleteFuture来进行改造呢,看下面
public CompletableFuture<Double> getPriceAsync(String product) {
CompletableFuture<Double> futureResult = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futureResult.complete(price);
} catch (InterruptedException e) {
futureResult.completeExceptionally(e);
}
}).start();
return futureResult;
}
看了一下这代码,感觉有点杂,单独开启一个线程来,如果请求很多,那么大量的线程将占用大量的资源,一种解决的办法是采用我们自己手工创建的线程池的方法,还有一种就是采用CompleteFuture给我们提供的静态方法,如下 :
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
改造之后:
public CompletableFuture<Double> getPriceAsync(String product) {
//we can refactor above code to below
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
Double v = null;
try {
v = calculatePrice(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
return v;
}, threadExecutor);
return future;
}
再深入CompleteFuture, 我们观察到其内部还提供这些函数。
我们可以猜测到,其功能可能远远不止我们上面介绍的那些,下面我们就简单的使用下其中一个方法。
public static void testCombine() throws ExecutionException,InterruptedException {
PriceService priceService = new PriceService();
Future<Double> future = CompletableFuture.supplyAsync(() -> {
return priceService.getPrice("apple");
}).thenCombine(CompletableFuture.supplyAsync(() -> {
return priceService.getPrice("Grape");
}
), (priceOne, priceTwo) -> priceOne + priceTwo);
System.out.println("The sum of the price of apple and Grade is" + future.get());
}