1.概览
java8引入了流的概念,流是作为一种对数据执行大量操作的有效方式。并行流可以被包含于支持并发的环境中。这些流可以提高执行性能-以牺牲多线程的开销为代价
在这篇短文中,我们将看一下 Stream API的最大限制,同时看一下如何让并行流和线程池实例(ThreadPool instance)一起工作。
2.并行流Parallel Stream
我们先以一个简单的例子来开始-在任一个Collection类型上调用parallelStream方法-它将返回一个可能的并行流。
@Test
publicvoidgivenList_whenCallingParallelStream_shouldBeParallelStream(){
List aList = newArrayList<>();
Stream parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
这样的流的默认处理流程是使用ForkJoinPool.commonPool(),这是一个被整个应用程序所共享的线程池。
3.自定义线程池
在处理流的时候,我们可以传递自定义一个线程池。下面的例子中,我们有一个并行流,这个并行流使用了一个自定义的线程池去计算1到 1,000,000的和:
@Testpublic void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException { long firstNum = 1; long lastNum = 1_000_000; List aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我们使用ForkJoinPool的构造方法并设定并行级别为4去创建一个线程池。要想确定不同环境的最优值(optimal),我们需要试验一下。一个好的做法就是,基于你CPU的核数来确定并行级别的数值。
4.总结
我们简要地看了一下,如何使用一个自定义的Thread Pool运行并行流。只要在正确的环境中配置了合适的平行级别,就能在确定的情况下获得较高的执行性能。
本文中引用的完整代码都可在github上找到:ON Github