本篇是简单介绍如何自定义线程池并在Java 8 的Parallel Streams中使用线程池。并举例介绍如何不使用普通线程池而是使用自定义线程池来调用Parallel streams API。
1. 介绍
在本篇教程中,您将学习如何使用强大的并行流API(在Java8中)创建用于批量数据处理的自定义线程池。
并行流可以在并发环境中运行,这是以多线程开销为代价的streams性能的改进版本。
本文重点关注Stream API的最大限制并举例说明如何将并行流与自定义线程池结合使用。
public class CustomPootParallelStreams {
public static void main(String[] args) throws ExecutionException, InterruptedException {
parallelStreamProcess();
}
private static void parallelstreamProcess() throws ExecutionException, InterruptedException {
int start = 1;
int end = 10000;
List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
.collect(Collectors.toList());
System.out.println(intList.size());
ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
int actualTotal = newCustomThreadPool.submit(
() -> {
int a = intList.stream().reduce(0, Integer::sum).intValue();
System.out.println("------" + a);
return a;
}).get();
System.out.println("actualTotal " + actualTotal);
}
}
2. Java 8 并行流
首先,让我们看看如何从一个集合中创建并行流。
为了使一个流可以在多核处理器中运行,你只需要调用parallelStream()
方法来创建并行流。
package com.javaprogramto.java8.streams.parallel.streams;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
public class ParallelStreamCreation {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(10, 20, 30, 40, 50);
Stream<Integer> parallelStream = intList.parallelStream();
parallelStream.forEach(value -> System.out.println(value));
}
}
Output:
[30
40
50
20
10]
你可以观察到输出结果是多核处理器随机打印出来的。在内部,并行流使用SplitIterator和StreamSupport类使其并行运行。
并行流的默认的处理过程是用 ForkJoinPool.commonPool()
来创建的线程池,这样创建的线程池会被整个应用所共享。如果你同时运行大量的并行流,则可能会看到处理时间的性能和延迟。
3. 使用自定义线程池
上面的操作并行流将会使用普通的ForkJoinPool 线程池。
如果你有许多并行流需要同时运行并且其中一些并行流可能会因为网络延迟导致处理时长超出预期,并且这些任务可能会阻塞由公共线程池创建的进程。因此,它会导致任务的速度变慢,需要更长的时间来执行。
在这些情况下,最好使用并行流组合的自定义线程池。
看看下面的例子,我们使用ForkJoinPool
创建了5个线程并且在线程创建了一个并行流,以查找给定范围内所有数字的总和。
package com.javaprogramto.java8.streams.parallel.streams;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CustomPoolParallelStreams {
public static void main(String[] args) throws ExecutionException, InterruptedException {
parallelStreamProcess();
}
private static void parallelStreamProcess() throws ExecutionException, InterruptedException {
int start = 1;
int end = 10000;
List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
.collect(Collectors.toList());
System.out.println(intList.size());
ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
int actualTotal = newCustomThreadPool.submit(
() -> {
int a = intList.stream().parallel().reduce(0, Integer::sum).intValue();
return a;
}).get();
System.out.println("actualTotal " + actualTotal);
}
}
Output:
[10000
actualTotal 50005000]
实际上,上面的程序并没有给出有效的解决方案,不过我看到很多网站都在讨论这个解决方案。事实上,这段代码在ForkJoinPool中创建了一个并行流,在线程内部再次使用ForkJoinPool区域的公共池中的线程。
因此,如果您正在运行多个并行流,那不要使用这个Steam API的并行方法,因为这可能会减慢其他流的速度,从而用更多的时间给出结果。
在这段程序中,我们将线程池计数设为5,当然你可以根据你的CPU配置进行更改。如果你有更多的任务,那么你可以根据其他任务进行微调。
如果你只有一个并行流,那么你可以使用一个固定线程个数的线程池。
如果上述都不能满足,请等待Java的更新,并行流可以将ForkJoinPool作为输入来限制并行进程的数量
4. 结论
在本文中,您已经看到了如何在JavaStreamAPI中创建并行流,并行流API使用来自ForkJoinPool的公共共享线程池。但是,这是所有其他并行线程共享的,因此最好避免使用并行流,但是您可以使用第二种方法限制线程的数量。而且你还必须考虑使用第二种方法也有一些缺点。
只需等待官方oracle的新并行流api。
本文中显示的所有代码都是通过GitHub实现的。
您可以直接下载项目,并且可以在本地运行,无任何错误。
如果你有其他的见解,请在评论区留言谈论。
原文链接:Java 8 Parallel Streams - Custom Thread Pools Examples | Java Code Geeks - 2021