一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像或视频处理。Java SE 7中新引入了fork-join框架,专门用来处理此类问题。
假设有一个处理任务,它可以很自然地分解为子任务。图像处理就是这样一个例子。要增强一个图像,可以变换上半部分和下半部分。如果有足够多空闲的处理器,这些操作可以并行运行。假设想统计一个数组中有多少个元素满足某个特定的属性。可以将这个数组一分为二,分别对这两部分进行统计,再将结果相加。
要采用框架可用的一种方式来完成这种递归运算,需要提供一个扩展RecursiveTask<T>的类(如果计算会生成一个类型为T的结果)或者提供一个扩展RecursiveAction的类(如果不生成任何结果)。再覆盖compute方法来生成并调用子任务,然后合并其结果。
class Counter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;
public Counter (double[] values, int from, int to, DoublePredicate filter)
{
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) count++;
}
return count;
}
else {
int mid = (from + to) / 2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
//invokeAll方法接收到很多任务并阻塞
invokeAll(first, second);
return first.join() + second.join();
}
}
}
class ForkJoinTest {
public static void main (String[] args) {
final int SIZE = 1000000;
double numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) numbers[i] = Math.random();
Counter counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
//ForkJoin线程池
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join);
}
}
invokeAll方法接收到很多任务并阻塞,直到所有这些任务都已经完成。join方法将生成结果。对每个子任务应用了join,并返回其总和。
在后台,fork-join框架使用了一种有效的智能方法来平衡可用线程的工作负载称为工作密取(work stealing)。每个工作线程都有一个双端队列来完成任务。一个工作线程将子任务压入其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁)。一个工作线程空闲时,它会从另一个双端队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。